001/**
002 * 
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.fs;
020
021import java.io.DataInputStream;
022import java.io.FileDescriptor;
023import java.io.FileInputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.nio.ByteBuffer;
027import java.util.EnumSet;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.io.ByteBufferPool;
032import org.apache.hadoop.util.IdentityHashStore;
033
034/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
035 * and buffers input through a {@link java.io.BufferedInputStream}. */
036@InterfaceAudience.Public
037@InterfaceStability.Stable
038public class FSDataInputStream extends DataInputStream
039    implements Seekable, PositionedReadable, 
040      ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
041      HasEnhancedByteBufferAccess, CanUnbuffer {
042  /**
043   * Map ByteBuffers that we have handed out to readers to ByteBufferPool 
044   * objects
045   */
046  private final IdentityHashStore<ByteBuffer, ByteBufferPool>
047    extendedReadBuffers
048      = new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
049
050  public FSDataInputStream(InputStream in) {
051    super(in);
052    if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
053      throw new IllegalArgumentException(
054          "In is not an instance of Seekable or PositionedReadable");
055    }
056  }
057  
058  /**
059   * Seek to the given offset.
060   *
061   * @param desired offset to seek to
062   */
063  @Override
064  public void seek(long desired) throws IOException {
065    ((Seekable)in).seek(desired);
066  }
067
068  /**
069   * Get the current position in the input stream.
070   *
071   * @return current position in the input stream
072   */
073  @Override
074  public long getPos() throws IOException {
075    return ((Seekable)in).getPos();
076  }
077  
078  /**
079   * Read bytes from the given position in the stream to the given buffer.
080   *
081   * @param position  position in the input stream to seek
082   * @param buffer    buffer into which data is read
083   * @param offset    offset into the buffer in which data is written
084   * @param length    maximum number of bytes to read
085   * @return total number of bytes read into the buffer, or <code>-1</code>
086   *         if there is no more data because the end of the stream has been
087   *         reached
088   */
089  @Override
090  public int read(long position, byte[] buffer, int offset, int length)
091    throws IOException {
092    return ((PositionedReadable)in).read(position, buffer, offset, length);
093  }
094
095  /**
096   * Read bytes from the given position in the stream to the given buffer.
097   * Continues to read until <code>length</code> bytes have been read.
098   *
099   * @param position  position in the input stream to seek
100   * @param buffer    buffer into which data is read
101   * @param offset    offset into the buffer in which data is written
102   * @param length    the number of bytes to read
103   * @throws IOException IO problems
104   * @throws EOFException If the end of stream is reached while reading.
105   *                      If an exception is thrown an undetermined number
106   *                      of bytes in the buffer may have been written. 
107   */
108  @Override
109  public void readFully(long position, byte[] buffer, int offset, int length)
110    throws IOException {
111    ((PositionedReadable)in).readFully(position, buffer, offset, length);
112  }
113  
114  /**
115   * See {@link #readFully(long, byte[], int, int)}.
116   */
117  @Override
118  public void readFully(long position, byte[] buffer)
119    throws IOException {
120    ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
121  }
122  
123  /**
124   * Seek to the given position on an alternate copy of the data.
125   *
126   * @param  targetPos  position to seek to
127   * @return true if a new source is found, false otherwise
128   */
129  @Override
130  public boolean seekToNewSource(long targetPos) throws IOException {
131    return ((Seekable)in).seekToNewSource(targetPos); 
132  }
133  
134  /**
135   * Get a reference to the wrapped input stream. Used by unit tests.
136   *
137   * @return the underlying input stream
138   */
139  @InterfaceAudience.LimitedPrivate({"HDFS"})
140  public InputStream getWrappedStream() {
141    return in;
142  }
143
144  @Override
145  public int read(ByteBuffer buf) throws IOException {
146    if (in instanceof ByteBufferReadable) {
147      return ((ByteBufferReadable)in).read(buf);
148    }
149
150    throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
151  }
152
153  @Override
154  public FileDescriptor getFileDescriptor() throws IOException {
155    if (in instanceof HasFileDescriptor) {
156      return ((HasFileDescriptor) in).getFileDescriptor();
157    } else if (in instanceof FileInputStream) {
158      return ((FileInputStream) in).getFD();
159    } else {
160      return null;
161    }
162  }
163
164  @Override
165  public void setReadahead(Long readahead)
166      throws IOException, UnsupportedOperationException {
167    try {
168      ((CanSetReadahead)in).setReadahead(readahead);
169    } catch (ClassCastException e) {
170      throw new UnsupportedOperationException(
171          "this stream does not support setting the readahead " +
172          "caching strategy.");
173    }
174  }
175
176  @Override
177  public void setDropBehind(Boolean dropBehind)
178      throws IOException, UnsupportedOperationException {
179    try {
180      ((CanSetDropBehind)in).setDropBehind(dropBehind);
181    } catch (ClassCastException e) {
182      throw new UnsupportedOperationException("this stream does not " +
183          "support setting the drop-behind caching setting.");
184    }
185  }
186
187  @Override
188  public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
189      EnumSet<ReadOption> opts) 
190          throws IOException, UnsupportedOperationException {
191    try {
192      return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
193          maxLength, opts);
194    }
195    catch (ClassCastException e) {
196      ByteBuffer buffer = ByteBufferUtil.
197          fallbackRead(this, bufferPool, maxLength);
198      if (buffer != null) {
199        extendedReadBuffers.put(buffer, bufferPool);
200      }
201      return buffer;
202    }
203  }
204
205  private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET =
206      EnumSet.noneOf(ReadOption.class);
207
208  final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength)
209          throws IOException, UnsupportedOperationException {
210    return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET);
211  }
212  
213  @Override
214  public void releaseBuffer(ByteBuffer buffer) {
215    try {
216      ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer);
217    }
218    catch (ClassCastException e) {
219      ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer);
220      if (bufferPool == null) {
221        throw new IllegalArgumentException("tried to release a buffer " +
222            "that was not created by this stream.");
223      }
224      bufferPool.putBuffer(buffer);
225    }
226  }
227
228  @Override
229  public void unbuffer() {
230    try {
231      ((CanUnbuffer)in).unbuffer();
232    } catch (ClassCastException e) {
233      throw new UnsupportedOperationException("this stream does not " +
234          "support unbuffering.");
235    }
236  }
237
238  /**
239   * String value. Includes the string value of the inner stream
240   * @return the stream
241   */
242  @Override
243  public String toString() {
244    return super.toString() + ": " + in;
245  }
246}