001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 package org.apache.hadoop.fs; 020 021 import java.io.*; 022 import java.nio.ByteBuffer; 023 import java.util.EnumSet; 024 025 import org.apache.hadoop.classification.InterfaceAudience; 026 import org.apache.hadoop.classification.InterfaceStability; 027 import org.apache.hadoop.io.ByteBufferPool; 028 import org.apache.hadoop.fs.ByteBufferUtil; 029 import org.apache.hadoop.util.IdentityHashStore; 030 031 /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream} 032 * and buffers input through a {@link BufferedInputStream}. */ 033 @InterfaceAudience.Public 034 @InterfaceStability.Stable 035 public class FSDataInputStream extends DataInputStream 036 implements Seekable, PositionedReadable, Closeable, 037 ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, 038 HasEnhancedByteBufferAccess { 039 /** 040 * Map ByteBuffers that we have handed out to readers to ByteBufferPool 041 * objects 042 */ 043 private final IdentityHashStore<ByteBuffer, ByteBufferPool> 044 extendedReadBuffers 045 = new IdentityHashStore<ByteBuffer, ByteBufferPool>(0); 046 047 public FSDataInputStream(InputStream in) 048 throws IOException { 049 super(in); 050 if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) { 051 throw new IllegalArgumentException( 052 "In is not an instance of Seekable or PositionedReadable"); 053 } 054 } 055 056 /** 057 * Seek to the given offset. 058 * 059 * @param desired offset to seek to 060 */ 061 @Override 062 public synchronized void seek(long desired) throws IOException { 063 ((Seekable)in).seek(desired); 064 } 065 066 /** 067 * Get the current position in the input stream. 068 * 069 * @return current position in the input stream 070 */ 071 @Override 072 public long getPos() throws IOException { 073 return ((Seekable)in).getPos(); 074 } 075 076 /** 077 * Read bytes from the given position in the stream to the given buffer. 078 * 079 * @param position position in the input stream to seek 080 * @param buffer buffer into which data is read 081 * @param offset offset into the buffer in which data is written 082 * @param length maximum number of bytes to read 083 * @return total number of bytes read into the buffer, or <code>-1</code> 084 * if there is no more data because the end of the stream has been 085 * reached 086 */ 087 @Override 088 public int read(long position, byte[] buffer, int offset, int length) 089 throws IOException { 090 return ((PositionedReadable)in).read(position, buffer, offset, length); 091 } 092 093 /** 094 * Read bytes from the given position in the stream to the given buffer. 095 * Continues to read until <code>length</code> bytes have been read. 096 * 097 * @param position position in the input stream to seek 098 * @param buffer buffer into which data is read 099 * @param offset offset into the buffer in which data is written 100 * @param length the number of bytes to read 101 * @throws EOFException If the end of stream is reached while reading. 102 * If an exception is thrown an undetermined number 103 * of bytes in the buffer may have been written. 104 */ 105 @Override 106 public void readFully(long position, byte[] buffer, int offset, int length) 107 throws IOException { 108 ((PositionedReadable)in).readFully(position, buffer, offset, length); 109 } 110 111 /** 112 * See {@link #readFully(long, byte[], int, int)}. 113 */ 114 @Override 115 public void readFully(long position, byte[] buffer) 116 throws IOException { 117 ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length); 118 } 119 120 /** 121 * Seek to the given position on an alternate copy of the data. 122 * 123 * @param targetPos position to seek to 124 * @return true if a new source is found, false otherwise 125 */ 126 @Override 127 public boolean seekToNewSource(long targetPos) throws IOException { 128 return ((Seekable)in).seekToNewSource(targetPos); 129 } 130 131 /** 132 * Get a reference to the wrapped input stream. Used by unit tests. 133 * 134 * @return the underlying input stream 135 */ 136 @InterfaceAudience.LimitedPrivate({"HDFS"}) 137 public InputStream getWrappedStream() { 138 return in; 139 } 140 141 @Override 142 public int read(ByteBuffer buf) throws IOException { 143 if (in instanceof ByteBufferReadable) { 144 return ((ByteBufferReadable)in).read(buf); 145 } 146 147 throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream"); 148 } 149 150 @Override 151 public FileDescriptor getFileDescriptor() throws IOException { 152 if (in instanceof HasFileDescriptor) { 153 return ((HasFileDescriptor) in).getFileDescriptor(); 154 } else if (in instanceof FileInputStream) { 155 return ((FileInputStream) in).getFD(); 156 } else { 157 return null; 158 } 159 } 160 161 @Override 162 public void setReadahead(Long readahead) 163 throws IOException, UnsupportedOperationException { 164 try { 165 ((CanSetReadahead)in).setReadahead(readahead); 166 } catch (ClassCastException e) { 167 throw new UnsupportedOperationException( 168 "this stream does not support setting the readahead " + 169 "caching strategy."); 170 } 171 } 172 173 @Override 174 public void setDropBehind(Boolean dropBehind) 175 throws IOException, UnsupportedOperationException { 176 try { 177 ((CanSetDropBehind)in).setDropBehind(dropBehind); 178 } catch (ClassCastException e) { 179 throw new UnsupportedOperationException("this stream does not " + 180 "support setting the drop-behind caching setting."); 181 } 182 } 183 184 @Override 185 public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, 186 EnumSet<ReadOption> opts) 187 throws IOException, UnsupportedOperationException { 188 try { 189 return ((HasEnhancedByteBufferAccess)in).read(bufferPool, 190 maxLength, opts); 191 } 192 catch (ClassCastException e) { 193 ByteBuffer buffer = ByteBufferUtil. 194 fallbackRead(this, bufferPool, maxLength); 195 if (buffer != null) { 196 extendedReadBuffers.put(buffer, bufferPool); 197 } 198 return buffer; 199 } 200 } 201 202 private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET = 203 EnumSet.noneOf(ReadOption.class); 204 205 final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength) 206 throws IOException, UnsupportedOperationException { 207 return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET); 208 } 209 210 @Override 211 public void releaseBuffer(ByteBuffer buffer) { 212 try { 213 ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer); 214 } 215 catch (ClassCastException e) { 216 ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer); 217 if (bufferPool == null) { 218 throw new IllegalArgumentException("tried to release a buffer " + 219 "that was not created by this stream."); 220 } 221 bufferPool.putBuffer(buffer); 222 } 223 } 224 }