001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.fs; 020 021import java.io.*; 022import java.nio.ByteBuffer; 023import java.util.EnumSet; 024 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceStability; 027import org.apache.hadoop.io.ByteBufferPool; 028import org.apache.hadoop.fs.ByteBufferUtil; 029import org.apache.hadoop.util.IdentityHashStore; 030 031/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream} 032 * and buffers input through a {@link BufferedInputStream}. */ 033@InterfaceAudience.Public 034@InterfaceStability.Stable 035public class FSDataInputStream extends DataInputStream 036 implements Seekable, PositionedReadable, 037 ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, 038 HasEnhancedByteBufferAccess { 039 /** 040 * Map ByteBuffers that we have handed out to readers to ByteBufferPool 041 * objects 042 */ 043 private final IdentityHashStore<ByteBuffer, ByteBufferPool> 044 extendedReadBuffers 045 = new IdentityHashStore<ByteBuffer, ByteBufferPool>(0); 046 047 public FSDataInputStream(InputStream in) { 048 super(in); 049 if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) { 050 throw new IllegalArgumentException( 051 "In is not an instance of Seekable or PositionedReadable"); 052 } 053 } 054 055 /** 056 * Seek to the given offset. 057 * 058 * @param desired offset to seek to 059 */ 060 @Override 061 public synchronized void seek(long desired) throws IOException { 062 ((Seekable)in).seek(desired); 063 } 064 065 /** 066 * Get the current position in the input stream. 067 * 068 * @return current position in the input stream 069 */ 070 @Override 071 public long getPos() throws IOException { 072 return ((Seekable)in).getPos(); 073 } 074 075 /** 076 * Read bytes from the given position in the stream to the given buffer. 077 * 078 * @param position position in the input stream to seek 079 * @param buffer buffer into which data is read 080 * @param offset offset into the buffer in which data is written 081 * @param length maximum number of bytes to read 082 * @return total number of bytes read into the buffer, or <code>-1</code> 083 * if there is no more data because the end of the stream has been 084 * reached 085 */ 086 @Override 087 public int read(long position, byte[] buffer, int offset, int length) 088 throws IOException { 089 return ((PositionedReadable)in).read(position, buffer, offset, length); 090 } 091 092 /** 093 * Read bytes from the given position in the stream to the given buffer. 094 * Continues to read until <code>length</code> bytes have been read. 095 * 096 * @param position position in the input stream to seek 097 * @param buffer buffer into which data is read 098 * @param offset offset into the buffer in which data is written 099 * @param length the number of bytes to read 100 * @throws EOFException If the end of stream is reached while reading. 101 * If an exception is thrown an undetermined number 102 * of bytes in the buffer may have been written. 103 */ 104 @Override 105 public void readFully(long position, byte[] buffer, int offset, int length) 106 throws IOException { 107 ((PositionedReadable)in).readFully(position, buffer, offset, length); 108 } 109 110 /** 111 * See {@link #readFully(long, byte[], int, int)}. 112 */ 113 @Override 114 public void readFully(long position, byte[] buffer) 115 throws IOException { 116 ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length); 117 } 118 119 /** 120 * Seek to the given position on an alternate copy of the data. 121 * 122 * @param targetPos position to seek to 123 * @return true if a new source is found, false otherwise 124 */ 125 @Override 126 public boolean seekToNewSource(long targetPos) throws IOException { 127 return ((Seekable)in).seekToNewSource(targetPos); 128 } 129 130 /** 131 * Get a reference to the wrapped input stream. Used by unit tests. 132 * 133 * @return the underlying input stream 134 */ 135 @InterfaceAudience.LimitedPrivate({"HDFS"}) 136 public InputStream getWrappedStream() { 137 return in; 138 } 139 140 @Override 141 public int read(ByteBuffer buf) throws IOException { 142 if (in instanceof ByteBufferReadable) { 143 return ((ByteBufferReadable)in).read(buf); 144 } 145 146 throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream"); 147 } 148 149 @Override 150 public FileDescriptor getFileDescriptor() throws IOException { 151 if (in instanceof HasFileDescriptor) { 152 return ((HasFileDescriptor) in).getFileDescriptor(); 153 } else if (in instanceof FileInputStream) { 154 return ((FileInputStream) in).getFD(); 155 } else { 156 return null; 157 } 158 } 159 160 @Override 161 public void setReadahead(Long readahead) 162 throws IOException, UnsupportedOperationException { 163 try { 164 ((CanSetReadahead)in).setReadahead(readahead); 165 } catch (ClassCastException e) { 166 throw new UnsupportedOperationException( 167 "this stream does not support setting the readahead " + 168 "caching strategy."); 169 } 170 } 171 172 @Override 173 public void setDropBehind(Boolean dropBehind) 174 throws IOException, UnsupportedOperationException { 175 try { 176 ((CanSetDropBehind)in).setDropBehind(dropBehind); 177 } catch (ClassCastException e) { 178 throw new UnsupportedOperationException("this stream does not " + 179 "support setting the drop-behind caching setting."); 180 } 181 } 182 183 @Override 184 public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, 185 EnumSet<ReadOption> opts) 186 throws IOException, UnsupportedOperationException { 187 try { 188 return ((HasEnhancedByteBufferAccess)in).read(bufferPool, 189 maxLength, opts); 190 } 191 catch (ClassCastException e) { 192 ByteBuffer buffer = ByteBufferUtil. 193 fallbackRead(this, bufferPool, maxLength); 194 if (buffer != null) { 195 extendedReadBuffers.put(buffer, bufferPool); 196 } 197 return buffer; 198 } 199 } 200 201 private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET = 202 EnumSet.noneOf(ReadOption.class); 203 204 final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength) 205 throws IOException, UnsupportedOperationException { 206 return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET); 207 } 208 209 @Override 210 public void releaseBuffer(ByteBuffer buffer) { 211 try { 212 ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer); 213 } 214 catch (ClassCastException e) { 215 ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer); 216 if (bufferPool == null) { 217 throw new IllegalArgumentException("tried to release a buffer " + 218 "that was not created by this stream."); 219 } 220 bufferPool.putBuffer(buffer); 221 } 222 } 223}