001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019 package org.apache.hadoop.fs;
020
021 import java.io.*;
022 import java.nio.ByteBuffer;
023 import java.util.EnumSet;
024
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027 import org.apache.hadoop.io.ByteBufferPool;
028 import org.apache.hadoop.fs.ByteBufferUtil;
029 import org.apache.hadoop.util.IdentityHashStore;
030
031 /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
032 * and buffers input through a {@link BufferedInputStream}. */
033 @InterfaceAudience.Public
034 @InterfaceStability.Stable
035 public class FSDataInputStream extends DataInputStream
036 implements Seekable, PositionedReadable,
037 ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
038 HasEnhancedByteBufferAccess {
039 /**
040 * Map ByteBuffers that we have handed out to readers to ByteBufferPool
041 * objects
042 */
043 private final IdentityHashStore<ByteBuffer, ByteBufferPool>
044 extendedReadBuffers
045 = new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
046
047 public FSDataInputStream(InputStream in) {
048 super(in);
049 if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
050 throw new IllegalArgumentException(
051 "In is not an instance of Seekable or PositionedReadable");
052 }
053 }
054
055 /**
056 * Seek to the given offset.
057 *
058 * @param desired offset to seek to
059 */
060 @Override
061 public synchronized void seek(long desired) throws IOException {
062 ((Seekable)in).seek(desired);
063 }
064
065 /**
066 * Get the current position in the input stream.
067 *
068 * @return current position in the input stream
069 */
070 @Override
071 public long getPos() throws IOException {
072 return ((Seekable)in).getPos();
073 }
074
075 /**
076 * Read bytes from the given position in the stream to the given buffer.
077 *
078 * @param position position in the input stream to seek
079 * @param buffer buffer into which data is read
080 * @param offset offset into the buffer in which data is written
081 * @param length maximum number of bytes to read
082 * @return total number of bytes read into the buffer, or <code>-1</code>
083 * if there is no more data because the end of the stream has been
084 * reached
085 */
086 @Override
087 public int read(long position, byte[] buffer, int offset, int length)
088 throws IOException {
089 return ((PositionedReadable)in).read(position, buffer, offset, length);
090 }
091
092 /**
093 * Read bytes from the given position in the stream to the given buffer.
094 * Continues to read until <code>length</code> bytes have been read.
095 *
096 * @param position position in the input stream to seek
097 * @param buffer buffer into which data is read
098 * @param offset offset into the buffer in which data is written
099 * @param length the number of bytes to read
100 * @throws EOFException If the end of stream is reached while reading.
101 * If an exception is thrown an undetermined number
102 * of bytes in the buffer may have been written.
103 */
104 @Override
105 public void readFully(long position, byte[] buffer, int offset, int length)
106 throws IOException {
107 ((PositionedReadable)in).readFully(position, buffer, offset, length);
108 }
109
110 /**
111 * See {@link #readFully(long, byte[], int, int)}.
112 */
113 @Override
114 public void readFully(long position, byte[] buffer)
115 throws IOException {
116 ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
117 }
118
119 /**
120 * Seek to the given position on an alternate copy of the data.
121 *
122 * @param targetPos position to seek to
123 * @return true if a new source is found, false otherwise
124 */
125 @Override
126 public boolean seekToNewSource(long targetPos) throws IOException {
127 return ((Seekable)in).seekToNewSource(targetPos);
128 }
129
130 /**
131 * Get a reference to the wrapped input stream. Used by unit tests.
132 *
133 * @return the underlying input stream
134 */
135 @InterfaceAudience.LimitedPrivate({"HDFS"})
136 public InputStream getWrappedStream() {
137 return in;
138 }
139
140 @Override
141 public int read(ByteBuffer buf) throws IOException {
142 if (in instanceof ByteBufferReadable) {
143 return ((ByteBufferReadable)in).read(buf);
144 }
145
146 throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
147 }
148
149 @Override
150 public FileDescriptor getFileDescriptor() throws IOException {
151 if (in instanceof HasFileDescriptor) {
152 return ((HasFileDescriptor) in).getFileDescriptor();
153 } else if (in instanceof FileInputStream) {
154 return ((FileInputStream) in).getFD();
155 } else {
156 return null;
157 }
158 }
159
160 @Override
161 public void setReadahead(Long readahead)
162 throws IOException, UnsupportedOperationException {
163 try {
164 ((CanSetReadahead)in).setReadahead(readahead);
165 } catch (ClassCastException e) {
166 throw new UnsupportedOperationException(
167 "this stream does not support setting the readahead " +
168 "caching strategy.");
169 }
170 }
171
172 @Override
173 public void setDropBehind(Boolean dropBehind)
174 throws IOException, UnsupportedOperationException {
175 try {
176 ((CanSetDropBehind)in).setDropBehind(dropBehind);
177 } catch (ClassCastException e) {
178 throw new UnsupportedOperationException("this stream does not " +
179 "support setting the drop-behind caching setting.");
180 }
181 }
182
183 @Override
184 public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
185 EnumSet<ReadOption> opts)
186 throws IOException, UnsupportedOperationException {
187 try {
188 return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
189 maxLength, opts);
190 }
191 catch (ClassCastException e) {
192 ByteBuffer buffer = ByteBufferUtil.
193 fallbackRead(this, bufferPool, maxLength);
194 if (buffer != null) {
195 extendedReadBuffers.put(buffer, bufferPool);
196 }
197 return buffer;
198 }
199 }
200
201 private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET =
202 EnumSet.noneOf(ReadOption.class);
203
204 final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength)
205 throws IOException, UnsupportedOperationException {
206 return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET);
207 }
208
209 @Override
210 public void releaseBuffer(ByteBuffer buffer) {
211 try {
212 ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer);
213 }
214 catch (ClassCastException e) {
215 ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer);
216 if (bufferPool == null) {
217 throw new IllegalArgumentException("tried to release a buffer " +
218 "that was not created by this stream.");
219 }
220 bufferPool.putBuffer(buffer);
221 }
222 }
223 }