001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.*;
021
022import org.apache.hadoop.classification.InterfaceAudience;
023import org.apache.hadoop.classification.InterfaceStability;
024
025/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
026 * and buffers input through a {@link BufferedInputStream}. */
027@InterfaceAudience.Public
028@InterfaceStability.Stable
029public class FSDataInputStream extends DataInputStream
030    implements Seekable, PositionedReadable, Closeable {
031
032  public FSDataInputStream(InputStream in)
033    throws IOException {
034    super(in);
035    if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
036      throw new IllegalArgumentException(
037          "In is not an instance of Seekable or PositionedReadable");
038    }
039  }
040  
041  /**
042   * Seek to the given offset.
043   *
044   * @param desired offset to seek to
045   */
046  public synchronized void seek(long desired) throws IOException {
047    ((Seekable)in).seek(desired);
048  }
049
050  /**
051   * Get the current position in the input stream.
052   *
053   * @return current position in the input stream
054   */
055  public long getPos() throws IOException {
056    return ((Seekable)in).getPos();
057  }
058  
059  /**
060   * Read bytes from the given position in the stream to the given buffer.
061   *
062   * @param position  position in the input stream to seek
063   * @param buffer    buffer into which data is read
064   * @param offset    offset into the buffer in which data is written
065   * @param length    maximum number of bytes to read
066   * @return total number of bytes read into the buffer, or <code>-1</code>
067   *         if there is no more data because the end of the stream has been
068   *         reached
069   */
070  public int read(long position, byte[] buffer, int offset, int length)
071    throws IOException {
072    return ((PositionedReadable)in).read(position, buffer, offset, length);
073  }
074
075  /**
076   * Read bytes from the given position in the stream to the given buffer.
077   * Continues to read until <code>length</code> bytes have been read.
078   *
079   * @param position  position in the input stream to seek
080   * @param buffer    buffer into which data is read
081   * @param offset    offset into the buffer in which data is written
082   * @param length    the number of bytes to read
083   * @throws EOFException If the end of stream is reached while reading.
084   *                      If an exception is thrown an undetermined number
085   *                      of bytes in the buffer may have been written. 
086   */
087  public void readFully(long position, byte[] buffer, int offset, int length)
088    throws IOException {
089    ((PositionedReadable)in).readFully(position, buffer, offset, length);
090  }
091  
092  /**
093   * See {@link #readFully(long, byte[], int, int)}.
094   */
095  public void readFully(long position, byte[] buffer)
096    throws IOException {
097    ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
098  }
099  
100  /**
101   * Seek to the given position on an alternate copy of the data.
102   *
103   * @param  targetPos  position to seek to
104   * @return true if a new source is found, false otherwise
105   */
106  public boolean seekToNewSource(long targetPos) throws IOException {
107    return ((Seekable)in).seekToNewSource(targetPos); 
108  }
109  
110  /**
111   * Get a reference to the wrapped input stream. Used by unit tests.
112   *
113   * @return the underlying input stream
114   */
115  @InterfaceAudience.LimitedPrivate({"HDFS"})
116  public InputStream getWrappedStream() {
117    return in;
118  }
119}