001    /**
002     * 
003     * Licensed to the Apache Software Foundation (ASF) under one
004     * or more contributor license agreements.  See the NOTICE file
005     * distributed with this work for additional information
006     * regarding copyright ownership.  The ASF licenses this file
007     * to you under the Apache License, Version 2.0 (the
008     * "License"); you may not use this file except in compliance
009     * with the License.  You may obtain a copy of the License at
010     *
011     *     http://www.apache.org/licenses/LICENSE-2.0
012     *
013     * Unless required by applicable law or agreed to in writing, software
014     * distributed under the License is distributed on an "AS IS" BASIS,
015     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016     * See the License for the specific language governing permissions and
017     * limitations under the License.
018     */
019    package org.apache.hadoop.fs;
020    
021    import java.io.*;
022    import java.nio.ByteBuffer;
023    import java.util.EnumSet;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    import org.apache.hadoop.io.ByteBufferPool;
028    import org.apache.hadoop.fs.ByteBufferUtil;
029    import org.apache.hadoop.util.IdentityHashStore;
030    
031    /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
032     * and buffers input through a {@link BufferedInputStream}. */
033    @InterfaceAudience.Public
034    @InterfaceStability.Stable
035    public class FSDataInputStream extends DataInputStream
036        implements Seekable, PositionedReadable, Closeable, 
037          ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
038          HasEnhancedByteBufferAccess {
039      /**
040       * Map ByteBuffers that we have handed out to readers to ByteBufferPool 
041       * objects
042       */
043      private final IdentityHashStore<ByteBuffer, ByteBufferPool>
044        extendedReadBuffers
045          = new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
046    
047      public FSDataInputStream(InputStream in)
048        throws IOException {
049        super(in);
050        if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
051          throw new IllegalArgumentException(
052              "In is not an instance of Seekable or PositionedReadable");
053        }
054      }
055      
056      /**
057       * Seek to the given offset.
058       *
059       * @param desired offset to seek to
060       */
061      @Override
062      public synchronized void seek(long desired) throws IOException {
063        ((Seekable)in).seek(desired);
064      }
065    
066      /**
067       * Get the current position in the input stream.
068       *
069       * @return current position in the input stream
070       */
071      @Override
072      public long getPos() throws IOException {
073        return ((Seekable)in).getPos();
074      }
075      
076      /**
077       * Read bytes from the given position in the stream to the given buffer.
078       *
079       * @param position  position in the input stream to seek
080       * @param buffer    buffer into which data is read
081       * @param offset    offset into the buffer in which data is written
082       * @param length    maximum number of bytes to read
083       * @return total number of bytes read into the buffer, or <code>-1</code>
084       *         if there is no more data because the end of the stream has been
085       *         reached
086       */
087      @Override
088      public int read(long position, byte[] buffer, int offset, int length)
089        throws IOException {
090        return ((PositionedReadable)in).read(position, buffer, offset, length);
091      }
092    
093      /**
094       * Read bytes from the given position in the stream to the given buffer.
095       * Continues to read until <code>length</code> bytes have been read.
096       *
097       * @param position  position in the input stream to seek
098       * @param buffer    buffer into which data is read
099       * @param offset    offset into the buffer in which data is written
100       * @param length    the number of bytes to read
101       * @throws EOFException If the end of stream is reached while reading.
102       *                      If an exception is thrown an undetermined number
103       *                      of bytes in the buffer may have been written. 
104       */
105      @Override
106      public void readFully(long position, byte[] buffer, int offset, int length)
107        throws IOException {
108        ((PositionedReadable)in).readFully(position, buffer, offset, length);
109      }
110      
111      /**
112       * See {@link #readFully(long, byte[], int, int)}.
113       */
114      @Override
115      public void readFully(long position, byte[] buffer)
116        throws IOException {
117        ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
118      }
119      
120      /**
121       * Seek to the given position on an alternate copy of the data.
122       *
123       * @param  targetPos  position to seek to
124       * @return true if a new source is found, false otherwise
125       */
126      @Override
127      public boolean seekToNewSource(long targetPos) throws IOException {
128        return ((Seekable)in).seekToNewSource(targetPos); 
129      }
130      
131      /**
132       * Get a reference to the wrapped input stream. Used by unit tests.
133       *
134       * @return the underlying input stream
135       */
136      @InterfaceAudience.LimitedPrivate({"HDFS"})
137      public InputStream getWrappedStream() {
138        return in;
139      }
140    
141      @Override
142      public int read(ByteBuffer buf) throws IOException {
143        if (in instanceof ByteBufferReadable) {
144          return ((ByteBufferReadable)in).read(buf);
145        }
146    
147        throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
148      }
149    
150      @Override
151      public FileDescriptor getFileDescriptor() throws IOException {
152        if (in instanceof HasFileDescriptor) {
153          return ((HasFileDescriptor) in).getFileDescriptor();
154        } else if (in instanceof FileInputStream) {
155          return ((FileInputStream) in).getFD();
156        } else {
157          return null;
158        }
159      }
160    
161      @Override
162      public void setReadahead(Long readahead)
163          throws IOException, UnsupportedOperationException {
164        try {
165          ((CanSetReadahead)in).setReadahead(readahead);
166        } catch (ClassCastException e) {
167          throw new UnsupportedOperationException(
168              "this stream does not support setting the readahead " +
169              "caching strategy.");
170        }
171      }
172    
173      @Override
174      public void setDropBehind(Boolean dropBehind)
175          throws IOException, UnsupportedOperationException {
176        try {
177          ((CanSetDropBehind)in).setDropBehind(dropBehind);
178        } catch (ClassCastException e) {
179          throw new UnsupportedOperationException("this stream does not " +
180              "support setting the drop-behind caching setting.");
181        }
182      }
183    
184      @Override
185      public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
186          EnumSet<ReadOption> opts) 
187              throws IOException, UnsupportedOperationException {
188        try {
189          return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
190              maxLength, opts);
191        }
192        catch (ClassCastException e) {
193          ByteBuffer buffer = ByteBufferUtil.
194              fallbackRead(this, bufferPool, maxLength);
195          if (buffer != null) {
196            extendedReadBuffers.put(buffer, bufferPool);
197          }
198          return buffer;
199        }
200      }
201    
202      private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET =
203          EnumSet.noneOf(ReadOption.class);
204    
205      final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength)
206              throws IOException, UnsupportedOperationException {
207        return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET);
208      }
209      
210      @Override
211      public void releaseBuffer(ByteBuffer buffer) {
212        try {
213          ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer);
214        }
215        catch (ClassCastException e) {
216          ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer);
217          if (bufferPool == null) {
218            throw new IllegalArgumentException("tried to release a buffer " +
219                "that was not created by this stream.");
220          }
221          bufferPool.putBuffer(buffer);
222        }
223      }
224    }