This document describes the commit protocol of the Manifest Committer
Term | Meaning |
---|---|
Committer | A class which can be invoked by MR Spark to perform the task and job commit operations. |
Spark Driver | The spark process scheduling the work and choreographing the commit operation. |
Job: in MapReduce | The entire application. In spark, this is a single stage in a chain of work |
Job Attempt | A single attempt at a job. MR supports multiple Job attempts with recovery on partial job failure. Spark says “start again from scratch” |
Task | a subsection of a job, such as processing one file, or one part of a file |
Task ID | ID of the task, unique within this job. Usually starts at 0 and is used in filenames (part-0000, part-001, etc.) |
Task attempt (TA) | An attempt to perform a task. It may fail, in which case MR/spark will schedule another. |
Task Attempt ID | A unique ID for the task attempt. The Task ID + an attempt counter. |
Destination directory | The final destination of work. |
Job Attempt Directory | A temporary directory used by the job attempt. This is always underneath the destination directory, so as to ensure it is in the same encryption zone as HDFS, storage volume in other filesystems, etc. |
Task Attempt directory | Directory under the Job Attempt Directory where task attempts create subdiretories for their own work |
Task Attempt Working Directory | Directory exclusive for each task attempt under which files are written |
Task Commit | Taking the output of a Task Attempt and making it the final/exclusive result of that “successful” Task. |
Job Commit | aggregating all the outputs of all committed tasks and producing the final results of the job. |
The purpose of a committer is to ensure that the complete output of a job ends up in the destination, even in the presence of failures of tasks.
For Hive’s classic hierarchical-directory-structured tables, job committing requires the output of all committed tasks to be put into the correct location in the directory tree.
The committer built into hadoop-mapreduce-client-core
module is the FileOutputCommitter
.
It has two algorithms, v1 and v2.
The v1 algorithm is resilient to all forms of task failure, but slow when committing the final aggregate output as it renames each newly created file to the correct place in the table one by one.
The v2 algorithm is not considered safe because the output is visible when individual tasks commit, rather than being delayed until job commit. It is possible for multiple task attempts to get their data into the output directory tree, and if a job fails/is aborted before the job is committed, thie output is visible.
job attempt directory in $dest/__temporary/$jobAttemptId/
contains all output of the job in progress every task attempt is allocated its own task attempt dir $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId
All work for a task is written under the task attempt directory. If the output is a deep tree with files at the root, the task attempt dir will end up with a similar structure, with the files it has generated and the directories above them.
The task attempt dir is renamed directly underneath the job attempt dir
rename( $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId $dest/__temporary/$jobAttemptId/$taskId)
For each committed task, all files underneath are renamed into the destination directory, with a filename relative from the base directory of the task remapped to that of the dest dir.
That is, everything under $dest/__temporary/$jobAttemptId/$taskId
is converted to a path under $dest
.
A recursive treewalk identifies the paths to rename in each TA directory. There’s some optimisation if the task directory tree contains a subdirectory directory which does not exist under the destination: in this case the whole directory can be renamed. If the directory already exists, a file-by-file merge takes place for that dir, with the action for subdirectories again depending on the presence of the destination.
As a result, if the output of each task goes a separate final directory (e.g the final partition is unique to a single task), the rename is O(1) for the dir, irrespective of children. If the output is to be in the same dir as other tasks (or updating existing directories), then the rename performance becomes O( files).
Finally, a 0-byte _SUCCESS
file is written iff mapreduce.fileoutputcommitter.marksuccessfuljobs
is true.
The files under the task attempt dir are renamed one by one into the destination directory. There’s no attempt at optimising directory renaming, because other tasks may be committing their work at the same time. It is therefore O(files)
+ the cost of listing the directory tree. Again: done with a recursive treewalk, not a deep listFiles(path, recursive=true)
API, which would be faster on HDFS and (though not relevant here) S3.
A 0-byte _SUCCESS
file is written iff mapreduce.fileoutputcommitter.marksuccessfuljobs
is true.
If, for a Task T1, Task Attempt 1 (T1A1) fails before committing, the driver will schedule a new attempt “T1A2”, and commit it. All is good.
But: if T1A1 was given permission to commit and it failed during the commit process, some of its output may have been written to the destination directory.
If attempt T1A2 was then told to commit, then if and only if its output had the exact set of file names would any already-renamed files be overwritten. If different filenames were generated, then the output would contain files of T1A1 and T1A2.
If T1A1 became partitioned during the commit process, then the job committer would schedule another attempt and commit its work. However, if T1A1 still had connectivity to the filesystem, it could still be renaming files. The output of the two tasks could be intermingled even if the same filenames were used.
The paper, A Zero-Rename Committer, Loughran et. al., covers these committers
It also describes the commit problem, defines correctness, and describes the algorithms of the v1 and v2 committers, as well as those of the S3A committers, IBM Stocator committer and what we know of EMR’s Spark committer.
The hadoop-aws
JAR contains a pair of committers, “Staging” and “Magic”. Both of these are implementations of the same problem: safely and rapidly committing work to an S3 object store.
The committers take advantage of the fact that S3 offers an atomic way to create a file: the PUT request.
Files either exist or they don’t. A file can be uploaded direct to its destination, and it is only when the upload completes that the file is manifest -overwriting any existing copy.
For large files, a multipart upload allows this upload operation to be split into a series of POST requests
1 initiate-upload (path -> upload ID)
1. upload part(path, upload ID, data[]) -> checksum.
This can be parallelised. Up to 10,000 parts can be uploaded to a single object. All but the final part must be >= 5MB. 1. complete-upload (path, upload ID, List<checksum>)
this manifests the file, building it from the parts in the sequence of blocks defined by the ordering of the checksums.
The secret for the S3A committers is that the final POST request can be delayed until the job commit phase, even though the files are uploaded during task attempt execution/commit. The task attempts need to determine the final destination of each file, upload the data as part of a multipart operation, then save the information needed to complete the upload in a file which is later read by the job committer and used in a POST request.
The Staging Committer is based on the contribution by Ryan Blue of Netflix. it relies on HDFS to be the consistent store to propagate the .pendingset
files.
The working directory of each task attempt is in the local filesystem, “the staging directory”. The information needed to complete the uploads is passed from Task Attempts to the Job Committer by using a v1 FileOutputCommitter working with the cluster HDFS filesystem. This ensures that the committer has the same correctness guarantees as the v1 algorithm.
The Magic Committer is purely-S3A and takes advantage and of the fact the authorts could make changes within the file system client itself.
“Magic” paths are defined which, when opened for writing under, initiate a multi-party upload to the final destination directory. When the output stream is close()
d, a zero byte marker file is written to the magic path, and a JSON .pending file containing all the information needed to complete the upload is saved.
Task commit: 1. List all .pending
files under each task attempt’s magic directory; 1. Aggregate to a .pendingset
file 1. Save to the job attempt directory with the task ID.
Job commit:
.pendingset
files in the job attempt directoryThe Magic committer absolutely requires a consistent S3 Store -originally with S3Guard. Now that S3 is consistent, raw S3 can be used. It does not need HDFS or any other filesystem with rename()
.
The S3A committer is considered correct because
Significant issues which were fixed include:
pendingset
already existsspark.sql.sources.writeJobUUID
Of those which affected the correctness rather than scale/performance/UX: HADOOP-17258 involved the recovery from a failure after TA1 task commit had completed —but had failed to report in. SPARK-33402, SPARK-33230 and HADOOP-17318 are all related: if two spark jobs/stages started in the same second, they had the same job ID. This caused the HDFS directories used by the staging committers to be intermingled.
What is notable is this: these are all problems which the minimal integration test suites did not discover.
The good news: we now know of these issues and are better placed to avoid replicating them again. And know what to write tests for.
The V1 committer underperforms on ABFS because:
The V2 committer is much faster in the job commit because it performs the list and rename process in the task commit. Which, because it is non-atomic, is why it is considered dangerous to use. What the V2 task commit algorithm does show is that it is possible to parallelise committing the output of different tasks by using file-by-file rename exclusively.
The V1 committer underperforms on GCS because even the task commit operation, —directory rename—, is a non-atomic O(files)
operation. This also means that it is unsafe.
If the task attempt has partitioned and the spark driver schedules/commits another TA, then, the task dir may contain 1+ file from the first attempt.
Stores/filesystems supported by this committer MUST:
O(1)
file rename operation.Stores/filesystems supported by this committer SHOULD:
EtagSource
interface of HADOOP-17979. This is used for ABFS rename recovery, and for optional validation of the final output.Stores/filesystems supported by this committer MAY:
Stores/filesystems supported by this committer MAY NOT:
O(1)
directory deletion. The CleanupJobStage
assumes this is not the case and so deletes task attempt directories in parallel.create(Path, overwrite=false)
operation. The manifests are committed by writing to a path including the task attempt ID, then renamed to their final path.listFiles(path, recursive=true)
calls. This API call is not used.When compared with the FileOutputCommitter
, the requirements which have been removed are:
O(1)
directory deletion.HDFS meets all those requirements, so does not benefit significantly from this committer, though it will still work there.
The S3 store does not meet the rename requirements of this committer, even now that it is consistent. This committer is not safe to use on S3.
Every job MUST have a unique ID.
The implementation expects the Spark runtime to have the relevant patches to ensure this.
The job ID is used to name temporary directories, rather than using the classic incrementing natural numbering scheme of _temporary/0/
. That scheme comes from MapReduce where job attempts of attempt ID > 1 look for tasks committed by predecessors and incorporate that into their results.
This committer targets Spark, where there is no attempt at recovery. By using the job ID in paths, if jobs are configured to not delete all of _temporary
in job cleanup/abort, then multiple jobs MAY be executed using the same table as their destination.
Task IDs and Task Attempt IDs will be derived from Job IDs as usual.
It is expected that filenames of written files SHALL be unique. This is done in Spark for ORC and Parquet files, and allows for checks for destination files to be omitted by default.
Given a destination directory destDir: Path
A job of id jobID: String
and attempt number jobAttemptNumber:int
will use the directory:
$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/
For its work (note: it will actually format that final subdir with %02d
).
This is termed the Job Attempt Directory
Under the Job Attempt Directory, a subdirectory tasks
is created. This is termed the Task Attempt Directory. Every task attempt will have its own subdirectory of this, into which its work will be saved.
Under the Job Attempt Directory, a subdirectory manifests
is created. This is termed the y.
The manifests of all committed tasks will be saved to this directory with the filename of $taskId-manifest.json
The full path
$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/manifests/$taskId-manifest.json
Is the final location for the manifest of all files created by a committed task. It is termed the Manifest Path of a Committed Task.
Task attempts will save their manifest into this directory with a temporary filename $taskAttemptId-manifest.json.tmp
.
This is termed the Temporary Path of a Task Attempt’s Manifest.
For the job and task operations then, the following paths are defined.
let jobDirectory = "$destDir/_temporary/manifest_$jobID/" let jobAttemptDirectory = jobDirectory + "$jobAttemptNumber/" let manifestDirectory = jobAttemptDirectory + "manifests/" let taskAttemptDirectory = jobAttemptDirectory + "tasks/"
And for each task attempt, the following paths are also defined
let taskAttemptWorkingDirectory = taskAttemptDirectory + "$taskAttemptId" let taskManifestPath = manifestDirectory + "$taskId-manifest.json" let taskAttemptTemporaryManifestPath = manifestDirectory + "$taskAttemptId-manifest.json"
This is JSON file is designed which contains (along with IOStatistics and some diagnostics)
mkdir(jobAttemptDirectory) mkdir(manifestDirectory) mkdir(taskAttemptDirectory)
mkdir(taskAttemptWorkingDirectory)
Task attempts are committed by:
No renaming takes place at this point.: the files are left in their original location until renamed in job commit.
let (renames, directories) = scan(taskAttemptWorkingDirectory) let manifest = new Manifest(renames, directories) manifest.save(taskAttemptTemporaryManifestPath) rename(taskAttemptTemporaryManifestPath, taskManifestPath)
delete(taskAttemptWorkingDirectory)
Job Commit consists of:
_SUCCESS
file with the same format as the S3A committer (for testing; use write and rename for atomic save)The job commit phase supports parallelization for many tasks and many files per task, specifically there is a thread pool for parallel store IO
let manifestPaths = list("$manifestDirectory/*-manifest.json") let manifests = manifestPaths.map(p -> loadManifest(p)) let directoriesToCreate = merge(manifests.directories) let filesToRename = concat(manifests.files) directoriesToCreate.map(p -> mkdirs(p)) filesToRename.map((src, dest, etag) -> rename(src, dest, etag)) if mapreduce.fileoutputcommitter.marksuccessfuljobs then success.save("$destDir/_SUCCESS")
Implementation Note:
To aid debugging and development, the summary be saved to a location in the same or different filesystem; the intermediate manifests may be renamed to a location in the target filesystem.
if summary.report.directory != "" then success.save("${summary.report.directory}/$jobID.json") if diagnostics.manifest.directory != null then rename($manifestDirectory, "${diagnostics.manifest.directory}/$jobID")
The summary report is saved even if job commit fails for any reason
Job cleanup is nominally one of deleting the job directory
delete(jobDirectory)
To address scale issues with the object stores, this SHALL be preceeded by a (parallelized) deletion of all task attempt working directories
let taskAttemptWorkingDirectories = list("taskAttemptDirectory") taskAttemptWorkingDirectories.map(p -> delete(p))