The core behavior of FSDataInputStream is defined by java.io.DataInputStream, with extensions that add key assumptions to the system.
Files are opened via FileSystem.open(p), which, if successful, returns:
result = FSDataInputStream(0, FS.Files[p])
The stream can be modeled as:
FSDIS = (pos, data[], isOpen)
with access functions:
pos(FSDIS) data(FSDIS) isOpen(FSDIS)
Implicit invariant: the size of the data stream equals the size of the file as returned by FileSystem.getFileStatus(Path p)
forall p in dom(FS.Files[p]) : len(data(FSDIS)) == FS.getFileStatus(p).length
The semantics of java.io.Closeable are defined in the interface definition within the JRE.
The operation MUST be idempotent; the following sequence is not an error:
FSDIS.close(); FSDIS.close();
Implementations SHOULD be robust against failure. If an inner stream is closed, it should be checked for being null first.
Implementations SHOULD NOT raise IOException exceptions (or any other exception) during this operation. Client applications often ignore these, or may fail unexpectedly.
Return the data at the current position.
Read length bytes of data into the destination buffer, starting at offset offset
Not all subclasses implement the Seek operation:
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
If the operation is supported, the file SHOULD be open:
isOpen(FSDIS)
Some filesystems do not perform this check, relying on the read() contract to reject reads on a closed stream (e.g. RawLocalFileSystem).
A seek(0) MUST always succeed, as the seek position must be positive and less than the length of the Stream’s:
s > 0 and ((s==0) or ((s < len(data)))) else raise [EOFException, IOException]
Some FileSystems do not raise an exception if this condition is not met. They instead return -1 on any read() operation where, at the time of the read, len(data(FSDIS)) < pos(FSDIS).
This operation instructs the source to retrieve data[] from a different source from the current source. This is only relevant if the filesystem supports multiple replicas of a file and there is more than 1 replica of the data at offset offset.
Not all subclasses implement the operation operation, and instead either raise an exception or return False.
supported(FSDIS, Seekable.seekToNewSource) else raise [UnsupportedOperationException, IOException]
Examples: CompressionInputStream , HttpFSFileSystem
If supported, the file must be open:
isOpen(FSDIS)
The majority of subclasses that do not implement this operation simply fail.
if not supported(FSDIS, Seekable.seekToNewSource(s)): result = False
Examples: RawLocalFileSystem , HttpFSFileSystem
If the operation is supported and there is a new location for the data:
FSDIS' = (pos, data', true) result = True
The new data is the original data (or an updated version of it, as covered in the Consistency section below), but the block containing the data at offset sourced from a different replica.
If there is no other copy, FSDIS is not updated; the response indicates this:
result = False
Outside of test methods, the primary use of this method is in the {{FSInputChecker}} class, which can react to a checksum error in a read by attempting to source the data elsewhere. It a new source can be found it attempts to reread and recheck that portion of the file.
The PositionedReadable operations supply “positioned reads” (“pread”). They provide the ability to read data into a buffer from a specific position in the data stream. Positioned reads equate to a Seekable.seek at a particular offset followed by a InputStream.read(buffer[], offset, length), only there is a single method invocation, rather than seek then read, and two positioned reads can optionally run concurrently over a single instance of a FSDataInputStream stream.
The interface declares positioned reads thread-safe (some of the implementations do not follow this guarantee).
Any positional read run concurrent with a stream operation — e.g. Seekable.seek, Seekable.getPos(), and InputStream.read() — MUST run in isolation; there must not be mutual interference.
Concurrent positional reads and stream operations MUST be serializable; one may block the other so they run in series but, for better throughput and ‘liveness’, they SHOULD run concurrently.
Given two parallel positional reads, one at pos1 for len1 into buffer dest1, and another at pos2 for len2 into buffer dest2, AND given a concurrent, stream read run after a seek to pos3, the resultant buffers MUST be filled as follows, even if the reads happen to overlap on the underlying stream:
// Positioned read #1 read(pos1, dest1, ... len1) -> dest1[0..len1 - 1] = [data(FS, path, pos1), data(FS, path, pos1 + 1) ... data(FS, path, pos1 + len1 - 1] // Positioned read #2 read(pos2, dest2, ... len2) -> dest2[0..len2 - 1] = [data(FS, path, pos2), data(FS, path, pos2 + 1) ... data(FS, path, pos2 + len2 - 1] // Stream read seek(pos3); read(dest3, ... len3) -> dest3[0..len3 - 1] = [data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1]
Not all FSDataInputStream implementations support these operations. Those that do not implement Seekable.seek() do not implement the PositionedReadable interface.
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
This could be considered obvious: if a stream is not Seekable, a client cannot seek to a location. It is also a side effect of the base class implementation, which uses Seekable.seek().
Implicit invariant: for all PositionedReadable operations, the value of pos is unchanged at the end of the operation
pos(FSDIS') == pos(FSDIS)
At time t0
FSDIS0 = FS'read(p) = (0, data0[])
At time t1
FS' = FS' where FS'.Files[p] = data1
From time t >= t1, the value of FSDIS0 is undefined.
It may be unchanged
FSDIS0.data == data0 forall l in len(FSDIS0.data): FSDIS0.read() == data0[l]
It may pick up the new data
FSDIS0.data == data1 forall l in len(FSDIS0.data): FSDIS0.read() == data1[l]
It may be inconsistent, such that a read of an offset returns data from either of the datasets
forall l in len(FSDIS0.data): (FSDIS0.read(l) == data0[l]) or (FSDIS0.read(l) == data1[l]))
That is, every value read may be from the original or updated file.
It may also be inconsistent on repeated reads of same offset, that is at time t2 > t1:
r2 = FSDIS0.read(l)
While at time t3 > t2:
r3 = FSDIS0.read(l)
It may be that r3 != r2. (That is, some of the data my be cached or replicated, and on a subsequent read, a different version of the file’s contents are returned).
Similarly, if the data at the path p, is deleted, this change MAY or MAY not be visible during read operations performed on FSDIS0.