001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.fs; 019 020import java.io.EOFException; 021import java.io.IOException; 022import java.io.InputStream; 023 024import com.google.common.base.Preconditions; 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceStability; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/**************************************************************** 031 * FSInputStream is a generic old InputStream with a little bit 032 * of RAF-style seek ability. 033 * 034 *****************************************************************/ 035@InterfaceAudience.Public 036@InterfaceStability.Evolving 037public abstract class FSInputStream extends InputStream 038 implements Seekable, PositionedReadable { 039 private static final Logger LOG = 040 LoggerFactory.getLogger(FSInputStream.class); 041 042 /** 043 * Seek to the given offset from the start of the file. 044 * The next read() will be from that location. Can't 045 * seek past the end of the file. 046 */ 047 @Override 048 public abstract void seek(long pos) throws IOException; 049 050 /** 051 * Return the current offset from the start of the file 052 */ 053 @Override 054 public abstract long getPos() throws IOException; 055 056 /** 057 * Seeks a different copy of the data. Returns true if 058 * found a new source, false otherwise. 059 */ 060 @Override 061 public abstract boolean seekToNewSource(long targetPos) throws IOException; 062 063 @Override 064 public int read(long position, byte[] buffer, int offset, int length) 065 throws IOException { 066 validatePositionedReadArgs(position, buffer, offset, length); 067 if (length == 0) { 068 return 0; 069 } 070 synchronized (this) { 071 long oldPos = getPos(); 072 int nread = -1; 073 try { 074 seek(position); 075 nread = read(buffer, offset, length); 076 } catch (EOFException e) { 077 // end of file; this can be raised by some filesystems 078 // (often: object stores); it is swallowed here. 079 LOG.debug("Downgrading EOFException raised trying to" + 080 " read {} bytes at offset {}", length, offset, e); 081 } finally { 082 seek(oldPos); 083 } 084 return nread; 085 } 086 } 087 088 /** 089 * Validation code, available for use in subclasses. 090 * @param position position: if negative an EOF exception is raised 091 * @param buffer destination buffer 092 * @param offset offset within the buffer 093 * @param length length of bytes to read 094 * @throws EOFException if the position is negative 095 * @throws IndexOutOfBoundsException if there isn't space for the amount of 096 * data requested. 097 * @throws IllegalArgumentException other arguments are invalid. 098 */ 099 protected void validatePositionedReadArgs(long position, 100 byte[] buffer, int offset, int length) throws EOFException { 101 Preconditions.checkArgument(length >= 0, "length is negative"); 102 if (position < 0) { 103 throw new EOFException("position is negative"); 104 } 105 Preconditions.checkArgument(buffer != null, "Null buffer"); 106 if (buffer.length - offset < length) { 107 throw new IndexOutOfBoundsException( 108 FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER 109 + ": request length=" + length 110 + ", with offset ="+ offset 111 + "; buffer capacity =" + (buffer.length - offset)); 112 } 113 } 114 115 @Override 116 public void readFully(long position, byte[] buffer, int offset, int length) 117 throws IOException { 118 validatePositionedReadArgs(position, buffer, offset, length); 119 int nread = 0; 120 while (nread < length) { 121 int nbytes = read(position + nread, 122 buffer, 123 offset + nread, 124 length - nread); 125 if (nbytes < 0) { 126 throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); 127 } 128 nread += nbytes; 129 } 130 } 131 132 @Override 133 public void readFully(long position, byte[] buffer) 134 throws IOException { 135 readFully(position, buffer, 0, buffer.length); 136 } 137}