001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.fs; 020 021import java.io.DataInputStream; 022import java.io.FileDescriptor; 023import java.io.FileInputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.nio.ByteBuffer; 027import java.util.EnumSet; 028 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.io.ByteBufferPool; 032import org.apache.hadoop.util.IdentityHashStore; 033 034/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream} 035 * and buffers input through a {@link java.io.BufferedInputStream}. */ 036@InterfaceAudience.Public 037@InterfaceStability.Stable 038public class FSDataInputStream extends DataInputStream 039 implements Seekable, PositionedReadable, 040 ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, 041 HasEnhancedByteBufferAccess, CanUnbuffer { 042 /** 043 * Map ByteBuffers that we have handed out to readers to ByteBufferPool 044 * objects 045 */ 046 private final IdentityHashStore<ByteBuffer, ByteBufferPool> 047 extendedReadBuffers 048 = new IdentityHashStore<ByteBuffer, ByteBufferPool>(0); 049 050 public FSDataInputStream(InputStream in) { 051 super(in); 052 if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) { 053 throw new IllegalArgumentException( 054 "In is not an instance of Seekable or PositionedReadable"); 055 } 056 } 057 058 /** 059 * Seek to the given offset. 060 * 061 * @param desired offset to seek to 062 */ 063 @Override 064 public void seek(long desired) throws IOException { 065 ((Seekable)in).seek(desired); 066 } 067 068 /** 069 * Get the current position in the input stream. 070 * 071 * @return current position in the input stream 072 */ 073 @Override 074 public long getPos() throws IOException { 075 return ((Seekable)in).getPos(); 076 } 077 078 /** 079 * Read bytes from the given position in the stream to the given buffer. 080 * 081 * @param position position in the input stream to seek 082 * @param buffer buffer into which data is read 083 * @param offset offset into the buffer in which data is written 084 * @param length maximum number of bytes to read 085 * @return total number of bytes read into the buffer, or <code>-1</code> 086 * if there is no more data because the end of the stream has been 087 * reached 088 */ 089 @Override 090 public int read(long position, byte[] buffer, int offset, int length) 091 throws IOException { 092 return ((PositionedReadable)in).read(position, buffer, offset, length); 093 } 094 095 /** 096 * Read bytes from the given position in the stream to the given buffer. 097 * Continues to read until <code>length</code> bytes have been read. 098 * 099 * @param position position in the input stream to seek 100 * @param buffer buffer into which data is read 101 * @param offset offset into the buffer in which data is written 102 * @param length the number of bytes to read 103 * @throws IOException IO problems 104 * @throws EOFException If the end of stream is reached while reading. 105 * If an exception is thrown an undetermined number 106 * of bytes in the buffer may have been written. 107 */ 108 @Override 109 public void readFully(long position, byte[] buffer, int offset, int length) 110 throws IOException { 111 ((PositionedReadable)in).readFully(position, buffer, offset, length); 112 } 113 114 /** 115 * See {@link #readFully(long, byte[], int, int)}. 116 */ 117 @Override 118 public void readFully(long position, byte[] buffer) 119 throws IOException { 120 ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length); 121 } 122 123 /** 124 * Seek to the given position on an alternate copy of the data. 125 * 126 * @param targetPos position to seek to 127 * @return true if a new source is found, false otherwise 128 */ 129 @Override 130 public boolean seekToNewSource(long targetPos) throws IOException { 131 return ((Seekable)in).seekToNewSource(targetPos); 132 } 133 134 /** 135 * Get a reference to the wrapped input stream. Used by unit tests. 136 * 137 * @return the underlying input stream 138 */ 139 @InterfaceAudience.LimitedPrivate({"HDFS"}) 140 public InputStream getWrappedStream() { 141 return in; 142 } 143 144 @Override 145 public int read(ByteBuffer buf) throws IOException { 146 if (in instanceof ByteBufferReadable) { 147 return ((ByteBufferReadable)in).read(buf); 148 } 149 150 throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream"); 151 } 152 153 @Override 154 public FileDescriptor getFileDescriptor() throws IOException { 155 if (in instanceof HasFileDescriptor) { 156 return ((HasFileDescriptor) in).getFileDescriptor(); 157 } else if (in instanceof FileInputStream) { 158 return ((FileInputStream) in).getFD(); 159 } else { 160 return null; 161 } 162 } 163 164 @Override 165 public void setReadahead(Long readahead) 166 throws IOException, UnsupportedOperationException { 167 try { 168 ((CanSetReadahead)in).setReadahead(readahead); 169 } catch (ClassCastException e) { 170 throw new UnsupportedOperationException( 171 "this stream does not support setting the readahead " + 172 "caching strategy."); 173 } 174 } 175 176 @Override 177 public void setDropBehind(Boolean dropBehind) 178 throws IOException, UnsupportedOperationException { 179 try { 180 ((CanSetDropBehind)in).setDropBehind(dropBehind); 181 } catch (ClassCastException e) { 182 throw new UnsupportedOperationException("this stream does not " + 183 "support setting the drop-behind caching setting."); 184 } 185 } 186 187 @Override 188 public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, 189 EnumSet<ReadOption> opts) 190 throws IOException, UnsupportedOperationException { 191 try { 192 return ((HasEnhancedByteBufferAccess)in).read(bufferPool, 193 maxLength, opts); 194 } 195 catch (ClassCastException e) { 196 ByteBuffer buffer = ByteBufferUtil. 197 fallbackRead(this, bufferPool, maxLength); 198 if (buffer != null) { 199 extendedReadBuffers.put(buffer, bufferPool); 200 } 201 return buffer; 202 } 203 } 204 205 private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET = 206 EnumSet.noneOf(ReadOption.class); 207 208 final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength) 209 throws IOException, UnsupportedOperationException { 210 return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET); 211 } 212 213 @Override 214 public void releaseBuffer(ByteBuffer buffer) { 215 try { 216 ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer); 217 } 218 catch (ClassCastException e) { 219 ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer); 220 if (bufferPool == null) { 221 throw new IllegalArgumentException("tried to release a buffer " + 222 "that was not created by this stream."); 223 } 224 bufferPool.putBuffer(buffer); 225 } 226 } 227 228 @Override 229 public void unbuffer() { 230 try { 231 ((CanUnbuffer)in).unbuffer(); 232 } catch (ClassCastException e) { 233 throw new UnsupportedOperationException("this stream does not " + 234 "support unbuffering."); 235 } 236 } 237 238 /** 239 * String value. Includes the string value of the inner stream 240 * @return the stream 241 */ 242 @Override 243 public String toString() { 244 return super.toString() + ": " + in; 245 } 246}