org.apache.hadoop.fs.MultipartUploader
The MultipartUploader
can upload a file using multiple parts to Hadoop-supported filesystems. The benefits of a multipart upload is that the file can be uploaded from multiple clients or processes in parallel and the results will not be visible to other clients until the complete
function is called.
When implemented by an object store, uploaded data may incur storage charges, even before it is visible in the filesystems. Users of this API must be diligent and always perform best-effort attempts to complete or abort the upload. The abortUploadsUnderPath(path)
operation can help here.
All the requirements of a valid MultipartUploader
are considered implicit econditions and postconditions:
The operations of a single multipart upload may take place across different instance of a multipart uploader, across different processes and hosts. It is therefore a requirement that:
All state needed to upload a part, complete an upload or abort an upload must be contained within or retrievable from an upload handle.
That handle MUST be serializable; it MUST be deserializable to different processes executing the exact same version of Hadoop.
different hosts/processes MAY upload different parts, sequentially or simultaneously. The order in which they are uploaded to the filesystem MUST NOT constrain the order in which the data is stored in the final file.
An upload MAY be completed on a different instance than any which uploaded parts.
The output of an upload MUST NOT be visible at the final destination until the upload may complete.
It is not an error if a single multipart uploader instance initiates or completes multiple uploads files to the same destination sequentially, irrespective of whether or not the store supports concurrent uploads.
Multiple processes may upload parts of a multipart upload simultaneously.
If a call is made to startUpload(path)
to a destination where an active upload is in progress, implementations MUST perform one of the two operations.
Which upload succeeds is undefined. Users must not expect consistent behavior across filesystems, across filesystem instances *or even across different requests.
If a multipart upload is completed or aborted while a part upload is in progress, the in-progress upload, if it has not completed, must not be included in the final file, in whole or in part. Implementations SHOULD raise an error in the putPart()
operation.
Users MUST NOT expect that serialized PathHandle versions are compatible across * different multipart uploader implementations. * different versions of the same implementation.
That is: all clients MUST use the exact same version of Hadoop.
A FileSystem/FileContext which supports Multipart Uploads extends the existing model (Directories, Files, Symlinks)
to one of (Directories, Files, Symlinks, Uploads)
Uploads
of type Map[UploadHandle -> Map[PartHandle -> UploadPart]
.
The Uploads
element of the state tuple is a map of all active uploads.
Uploads: Map[UploadHandle -> Map[PartHandle -> UploadPart]`
An UploadHandle is a non-empty list of bytes.
UploadHandle: List[byte] len(UploadHandle) > 0
Clients MUST treat this as opaque. What is core to this features design is that the handle is valid from across clients: the handle may be serialized on host hostA
, deserialized on hostB
and still used to extend or complete the upload.
UploadPart = (Path: path, parts: Map[PartHandle -> byte[]])
Similarly, the PartHandle
type is also a non-empty list of opaque bytes, again, marshallable between hosts.
PartHandle: List[byte]
It is implicit that each UploadHandle
in FS.Uploads
is unique. Similarly, each PartHandle
in the map of [PartHandle -> UploadPart]
must also be unique.
abort(FS, uploadHandle)
could unintentionally cancel a successor operation which used the same Upload Handle.All operations return CompletableFuture<>
types which must be subsequently evaluated to get their return values.
What this means is that when an implementation interacts with a fast file system/store all preconditions including the existence of files MAY be evaluated early, whereas and implementation interacting with a remote object store whose probes are slow MAY verify preconditions in the asynchronous phase -especially those which interact with the remote store.
Java CompletableFutures do not work well with checked exceptions. The Hadoop codease is still evolving the details of the exception handling here, as more use is made of the asynchronous APIs. Assume that any precondition failure which declares that an IOException
MUST be raised may have that operation wrapped in a RuntimeException
of some form if evaluated in the future; this also holds for any other IOException
raised during the operations.
close()
Applications MUST call close()
after using an uploader; this is so it may release other objects, update statistics, etc.
CompletableFuture<UploadHandle> startUpload(Path)
Starts a Multipart Upload, ultimately returning an UploadHandle
for use in subsequent operations.
if path == "/" : raise IOException if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
If a filesystem does not support concurrent uploads to a destination, then the following precondition is added:
if path in values(FS.Uploads) raise PathExistsException, IOException
Once the initialization operation completes, the filesystem state is updated with a new active upload, with a new handle, this handle being returned to the caller.
handle' = UploadHandle where not handle' in keys(FS.Uploads) FS' = FS where FS'.Uploads(handle') == {} result = handle'
CompletableFuture<PartHandle> putPart(UploadHandle uploadHandle, int partNumber, Path filePath, InputStream inputStream, long lengthInBytes)
Upload a part for the specific multipart upload, eventually being returned an opaque part handle represting this part of the specified upload.
uploadHandle in keys(FS.Uploads) partNumber >= 1 lengthInBytes >= 0 len(inputStream) >= lengthInBytes
data' = inputStream(0..lengthInBytes) partHandle' = byte[] where not partHandle' in keys(FS.uploads(uploadHandle).parts) FS' = FS where FS'.uploads(uploadHandle).parts(partHandle') == data' result = partHandle'
The data is stored in the filesystem, pending completion. It MUST NOT be visible at the destination path. It MAY be visible in a temporary path somewhere in the file system; This is implementation-specific and MUST NOT be relied upon.
CompletableFuture<PathHandle> complete(UploadHandle uploadId, Path filePath, Map<Integer, PartHandle> handles)
Complete the multipart upload.
A Filesystem may enforce a minimum size of each part, excluding the last part uploaded.
If a part is out of this range, an IOException
MUST be raised.
uploadHandle in keys(FS.Uploads) else raise FileNotFoundException FS.Uploads(uploadHandle).path == path if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException parts.size() > 0 forall k in keys(parts): k > 0 forall k in keys(parts): not exists(k2 in keys(parts)) where (parts[k] == parts[k2])
All keys MUST be greater than zero, and there MUST not be any duplicate references to the same parthandle. These validations MAY be performed at any point during the operation. After a failure, there is no guarantee that a complete()
call for this upload with a valid map of paths will complete. Callers SHOULD invoke abort()
after any such failure to ensure cleanup.
if putPart()
operations For this uploadHandle
were performed But whose PathHandle
Handles were not included in this request -the omitted parts SHALL NOT be a part of the resulting file.
The MultipartUploader MUST clean up any such outstanding entries.
In the case of backing stores that support directories (local filesystem, HDFS, etc), if, at the point of completion, there is now a directory at the destination then a PathIsDirectoryException
or other IOException
must be thrown.
UploadData' == ordered concatention of all data in the map of parts, ordered by key exists(FS', path') and result = PathHandle(path') FS' = FS where FS.Files(path) == UploadData' and not uploadHandle in keys(FS'.uploads)
The PathHandle
is returned by the complete operation so subsequent operations will be able to identify that the data has not changed in the meantime.
The order of parts in the uploaded by file is that of the natural order of parts in the map: part 1 is ahead of part 2, etc.
CompletableFuture<Void> abort(UploadHandle uploadId, Path filePath)
Abort a multipart upload. The handle becomes invalid and not subject to reuse.
uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
The upload handle is no longer known.
FS' = FS where not uploadHandle in keys(FS'.uploads)
A subsequent call to abort()
with the same handle will fail, unless the handle has been recycled.
CompletableFuture<Integer> abortUploadsUnderPath(Path path)
Perform a best-effort cleanup of all uploads under a path.
returns a future which resolves to.
-1 if unsuppported >= 0 if supported
Because it is best effort a strict postcondition isn’t possible. The ideal postcondition is all uploads under the path are aborted, and the count is the number of uploads aborted:
FS'.uploads forall upload in FS.uploads: not isDescendant(FS, path, upload.path) return len(forall upload in FS.uploads: isDescendant(FS, path, upload.path))