001    /**
002     * 
003     * Licensed to the Apache Software Foundation (ASF) under one
004     * or more contributor license agreements.  See the NOTICE file
005     * distributed with this work for additional information
006     * regarding copyright ownership.  The ASF licenses this file
007     * to you under the Apache License, Version 2.0 (the
008     * "License"); you may not use this file except in compliance
009     * with the License.  You may obtain a copy of the License at
010     *
011     *     http://www.apache.org/licenses/LICENSE-2.0
012     *
013     * Unless required by applicable law or agreed to in writing, software
014     * distributed under the License is distributed on an "AS IS" BASIS,
015     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016     * See the License for the specific language governing permissions and
017     * limitations under the License.
018     */
019    package org.apache.hadoop.fs;
020    
021    import java.io.*;
022    import java.nio.ByteBuffer;
023    import java.util.EnumSet;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    import org.apache.hadoop.io.ByteBufferPool;
028    import org.apache.hadoop.fs.ByteBufferUtil;
029    import org.apache.hadoop.util.IdentityHashStore;
030    
031    /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
032     * and buffers input through a {@link BufferedInputStream}. */
033    @InterfaceAudience.Public
034    @InterfaceStability.Stable
035    public class FSDataInputStream extends DataInputStream
036        implements Seekable, PositionedReadable, 
037          ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
038          HasEnhancedByteBufferAccess {
039      /**
040       * Map ByteBuffers that we have handed out to readers to ByteBufferPool 
041       * objects
042       */
043      private final IdentityHashStore<ByteBuffer, ByteBufferPool>
044        extendedReadBuffers
045          = new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
046    
047      public FSDataInputStream(InputStream in) {
048        super(in);
049        if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
050          throw new IllegalArgumentException(
051              "In is not an instance of Seekable or PositionedReadable");
052        }
053      }
054      
055      /**
056       * Seek to the given offset.
057       *
058       * @param desired offset to seek to
059       */
060      @Override
061      public synchronized void seek(long desired) throws IOException {
062        ((Seekable)in).seek(desired);
063      }
064    
065      /**
066       * Get the current position in the input stream.
067       *
068       * @return current position in the input stream
069       */
070      @Override
071      public long getPos() throws IOException {
072        return ((Seekable)in).getPos();
073      }
074      
075      /**
076       * Read bytes from the given position in the stream to the given buffer.
077       *
078       * @param position  position in the input stream to seek
079       * @param buffer    buffer into which data is read
080       * @param offset    offset into the buffer in which data is written
081       * @param length    maximum number of bytes to read
082       * @return total number of bytes read into the buffer, or <code>-1</code>
083       *         if there is no more data because the end of the stream has been
084       *         reached
085       */
086      @Override
087      public int read(long position, byte[] buffer, int offset, int length)
088        throws IOException {
089        return ((PositionedReadable)in).read(position, buffer, offset, length);
090      }
091    
092      /**
093       * Read bytes from the given position in the stream to the given buffer.
094       * Continues to read until <code>length</code> bytes have been read.
095       *
096       * @param position  position in the input stream to seek
097       * @param buffer    buffer into which data is read
098       * @param offset    offset into the buffer in which data is written
099       * @param length    the number of bytes to read
100       * @throws EOFException If the end of stream is reached while reading.
101       *                      If an exception is thrown an undetermined number
102       *                      of bytes in the buffer may have been written. 
103       */
104      @Override
105      public void readFully(long position, byte[] buffer, int offset, int length)
106        throws IOException {
107        ((PositionedReadable)in).readFully(position, buffer, offset, length);
108      }
109      
110      /**
111       * See {@link #readFully(long, byte[], int, int)}.
112       */
113      @Override
114      public void readFully(long position, byte[] buffer)
115        throws IOException {
116        ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
117      }
118      
119      /**
120       * Seek to the given position on an alternate copy of the data.
121       *
122       * @param  targetPos  position to seek to
123       * @return true if a new source is found, false otherwise
124       */
125      @Override
126      public boolean seekToNewSource(long targetPos) throws IOException {
127        return ((Seekable)in).seekToNewSource(targetPos); 
128      }
129      
130      /**
131       * Get a reference to the wrapped input stream. Used by unit tests.
132       *
133       * @return the underlying input stream
134       */
135      @InterfaceAudience.LimitedPrivate({"HDFS"})
136      public InputStream getWrappedStream() {
137        return in;
138      }
139    
140      @Override
141      public int read(ByteBuffer buf) throws IOException {
142        if (in instanceof ByteBufferReadable) {
143          return ((ByteBufferReadable)in).read(buf);
144        }
145    
146        throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
147      }
148    
149      @Override
150      public FileDescriptor getFileDescriptor() throws IOException {
151        if (in instanceof HasFileDescriptor) {
152          return ((HasFileDescriptor) in).getFileDescriptor();
153        } else if (in instanceof FileInputStream) {
154          return ((FileInputStream) in).getFD();
155        } else {
156          return null;
157        }
158      }
159    
160      @Override
161      public void setReadahead(Long readahead)
162          throws IOException, UnsupportedOperationException {
163        try {
164          ((CanSetReadahead)in).setReadahead(readahead);
165        } catch (ClassCastException e) {
166          throw new UnsupportedOperationException(
167              "this stream does not support setting the readahead " +
168              "caching strategy.");
169        }
170      }
171    
172      @Override
173      public void setDropBehind(Boolean dropBehind)
174          throws IOException, UnsupportedOperationException {
175        try {
176          ((CanSetDropBehind)in).setDropBehind(dropBehind);
177        } catch (ClassCastException e) {
178          throw new UnsupportedOperationException("this stream does not " +
179              "support setting the drop-behind caching setting.");
180        }
181      }
182    
183      @Override
184      public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
185          EnumSet<ReadOption> opts) 
186              throws IOException, UnsupportedOperationException {
187        try {
188          return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
189              maxLength, opts);
190        }
191        catch (ClassCastException e) {
192          ByteBuffer buffer = ByteBufferUtil.
193              fallbackRead(this, bufferPool, maxLength);
194          if (buffer != null) {
195            extendedReadBuffers.put(buffer, bufferPool);
196          }
197          return buffer;
198        }
199      }
200    
201      private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET =
202          EnumSet.noneOf(ReadOption.class);
203    
204      final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength)
205              throws IOException, UnsupportedOperationException {
206        return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET);
207      }
208      
209      @Override
210      public void releaseBuffer(ByteBuffer buffer) {
211        try {
212          ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer);
213        }
214        catch (ClassCastException e) {
215          ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer);
216          if (bufferPool == null) {
217            throw new IllegalArgumentException("tried to release a buffer " +
218                "that was not created by this stream.");
219          }
220          bufferPool.putBuffer(buffer);
221        }
222      }
223    }