This document describes the architecture and other implementation/correctness aspects of the Manifest Committer
The protocol and its correctness are covered in Manifest Committer Protocol.
The Manifest committer is a committer for work which provides performance on ABFS for “real world” queries, and performance and correctness on GCS.
This committer uses the extension point which came in for the S3A committers. Users can declare a new committer factory for abfs://
and gcs://
URLs. It can be used through Hadoop MapReduce and Apache Spark.
Term | Meaning |
---|---|
Committer | A class which can be invoked by MR/Spark to perform the task and job commit operations. |
Spark Driver | The spark process scheduling the work and choreographing the commit operation. |
Job | In MapReduce. the entire application. In spark, this is a single stage in a chain of work |
Job Attempt | A single attempt at a job. MR supports multiple Job attempts with recovery on partial job failure. Spark says “start again from scratch” |
Task | a subsection of a job, such as processing one file, or one part of a file |
Task ID | ID of the task, unique within this job. Usually starts at 0 and is used in filenames (part-0000, part-001, etc.) |
Task attempt (TA) | An attempt to perform a task. It may fail, in which case MR/spark will schedule another. |
Task Attempt ID | A unique ID for the task attempt. The Task ID + an attempt counter. |
Destination directory | The final destination of work. |
Job Attempt Directory | A temporary directory used by the job attempt. This is always underneath the destination directory, so as to ensure it is in the same encryption zone as HDFS, storage volume in other filesystems, etc. |
Task Attempt directory | (also known as “Task Attempt Working Directory”). Directory exclusive for each task attempt under which files are written |
Task Commit | Taking the output of a Task Attempt and making it the final/exclusive result of that “successful” Task. |
Job Commit | aggregating all the outputs of all committed tasks and producing the final results of the job. |
The purpose of a committer is to ensure that the complete output of a job ends up in the destination, even in the presence of failures of tasks.
For Hive’s classic hierarchical-directory-structured tables, job committing requires the output of all committed tasks to be put into the correct location in the directory tree.
The committer built into hadoop-mapreduce-client-core
module is the FileOutputCommitter
.
The Manifest Committer is a higher performance committer for ABFS and GCS storage for jobs which create file across deep directory trees through many tasks.
It will also work on hdfs://
and indeed, file://
URLs, but it is optimized to address listing and renaming performance and throttling issues in cloud storage.
It will not work correctly with S3, because it relies on an atomic rename-no-overwrite operation to commit the manifest file. It will also have the performance problems of copying rather than moving all the generated data.
Although it will work with MapReduce there is no handling of multiple job attempts with recovery from previous failed attempts.
A Manifest file is designed which contains (along with IOStatistics and some other things)
Task attempts are committed by:
No renaming takes place —the files are left in their original location.
The directory treewalk is single-threaded, then it is O(directories)
, with each directory listing using one or more paged LIST calls.
This is simple, and for most tasks, the scan is off the critical path of the job.
Statistics analysis may justify moving to parallel scans in future.
Job Commit consists of:
_SUCCESS
file with the same format as the S3A committer (for testing; use write and rename for atomic save)The job commit phase supports parallelization for many tasks and many files per task, specifically:
O(files)
on Google cloud storage, and also on ABFS when OAuth authentication is used.Optional scan of all ancestors …if any are files, delete.
getFileStatus()
on the path. Not found: create directory, add entry and those of all parent paths Found and is directory: add entry and those of all parent paths Found and is file: delete. then create as before.Efficiently handling concurrent creation of directories (or delete+create) is going to be a troublespot; some effort is invested there to build the set of directories to create.
Files are renamed in parallel.
A pre-rename check for anything being at that path (and deleting it) will be optional. With spark creating new UUIDs for each file, this isn’t going to happen, and saves HTTP requests.
Optional scan of all committed files and verify length and, if known, etag. For testing and diagnostics.
This solution is necessary for GCS and should be beneficial on ABFS as listing overheads are paid for in the task committers.
A key goal is to keep the manifest committer isolated and neither touch the existing committer code nor other parts of the hadoop codebase.
It must plug directly into MR and Spark without needing any changes other than already implemented for the S3A Committers
PathOutputCommitterFactory
.As a result of this there’s a bit of copy and paste from elsewhere, e.g. org.apache.hadoop.util.functional.TaskPool
is based on S3ACommitter’s org.apache.hadoop.fs.s3a.commit.Tasks
.
The_SUCCESS
file MUST be compatible with the S3A JSON file. This is to ensure any existing test suites which validate S3A committer output can be retargeted at jobs executed by the manifest committer without any changes.
When? Proposed: heartbeat until renaming finally finishes.
We would want to stop the entire job commit. Some atomic boolean “abort job” would need to be checked in the processing of each task committer thread’s iteraton through a directory (or processing of each file?) Failures in listing or renaming will need to be escalated to halting the entire job commit. This implies that any IOE raised in asynchronous rename operation or in a task committer thread must:
commitJob()
callIf a job commit stage is using a thread pool for per-task operations, e.g. loading files, that same thread pool MUST NOT be used for parallel operations within the per-task stage.
As every JobStage
is executed in sequence within task or job commit, it is safe to share the same thread pool across stages.
In the current implementation, there is no parallel “per manifest” operation in job commit other than for actually loading the files. The operations to create directories and to rename files are actually performed without performing parallel processing of individual manifests.
Directory Preparation: merge the directory lists of all manifests, then queue for creation the (hopefully very much smaller) set of unique directories.
Rename: iterate through all manifests and queue their renames into a pool for renaming.
The lifespan of thread pools is constrained to that of the stage configuration, which will be limited to within each of the PathOutputCommitter
methods to setup, commit, abort and cleanup.
This avoids the thread pool lifecycle problems of the S3A Committers.
This was a failure in terasorting where many tasks each generated many files; the full list of files to commit (and the etag of every block) was built up in memory and validated prior to execution.
The manifest committer assumes that the amount of data being stored in memory is less, because there is no longer the need to store an etag for every block of every file being committed.
This assumption turned out not to hold for some jobs: MAPREDUCE-7435. ManifestCommitter OOM on azure job
The strategy here was to read in all manifests and stream their entries to a local file, as Hadoop Writable objects -hence with lower marshalling overhead than JSON.
Combine all lists of directories to create and eliminate duplicates.
The implementation architecture reflects lessons from the S3A Connector.
The committer collects duration statistics on all the operations it performs/invokes against filesystems. * Those collected during task commit are saved to the manifest (excluding the time to save and rename that file) * When these manifests are loaded during job commit, these statistics are merged to form aggregate statistics of the whole job. * Which are saved to the _SUCCESS
file * and to any copy of that file in the directory specified by mapreduce.manifest.committer.summary.report.directory
, if set. to be saved. * The class org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter
can load and print these.
IO statistics from filsystems and input and output streams used in a query are not collected.
When invoking the ManifestCommitter
via the PathOutputCommitter
API, the following attributes are added to the active (thread) context
Key | Value |
---|---|
ji |
Job ID |
tai |
Task Attempt ID |
st |
Stage |
These are also all set in all the helper threads performing work as part of a stage’s execution.
Any store/FS which supports auditing is able to collect this data and include in their logs.
To ease backporting, all audit integration is in the single class org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration
.