001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.output; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceAudience.Private; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.fs.PathFilter; 034import org.apache.hadoop.mapreduce.JobContext; 035import org.apache.hadoop.mapreduce.JobStatus; 036import org.apache.hadoop.mapreduce.MRJobConfig; 037import org.apache.hadoop.mapreduce.OutputCommitter; 038import org.apache.hadoop.mapreduce.TaskAttemptContext; 039import org.apache.hadoop.mapreduce.TaskAttemptID; 040 041import com.google.common.annotations.VisibleForTesting; 042 043/** An {@link OutputCommitter} that commits files specified 044 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. 045 **/ 046@InterfaceAudience.Public 047@InterfaceStability.Stable 048public class FileOutputCommitter extends OutputCommitter { 049 private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class); 050 051 /** 052 * Name of directory where pending data is placed. Data that has not been 053 * committed yet. 054 */ 055 public static final String PENDING_DIR_NAME = "_temporary"; 056 /** 057 * Temporary directory name 058 * 059 * The static variable to be compatible with M/R 1.x 060 */ 061 @Deprecated 062 protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME; 063 public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; 064 public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 065 "mapreduce.fileoutputcommitter.marksuccessfuljobs"; 066 public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = 067 "mapreduce.fileoutputcommitter.algorithm.version"; 068 public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1; 069 // Number of attempts when failure happens in commit job 070 public static final String FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS = 071 "mapreduce.fileoutputcommitter.failures.attempts"; 072 073 // default value to be 1 to keep consistent with previous behavior 074 public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1; 075 private Path outputPath = null; 076 private Path workPath = null; 077 private final int algorithmVersion; 078 079 /** 080 * Create a file output committer 081 * @param outputPath the job's output path, or null if you want the output 082 * committer to act as a noop. 083 * @param context the task's context 084 * @throws IOException 085 */ 086 public FileOutputCommitter(Path outputPath, 087 TaskAttemptContext context) throws IOException { 088 this(outputPath, (JobContext)context); 089 if (outputPath != null) { 090 workPath = getTaskAttemptPath(context, outputPath); 091 } 092 } 093 094 /** 095 * Create a file output committer 096 * @param outputPath the job's output path, or null if you want the output 097 * committer to act as a noop. 098 * @param context the task's context 099 * @throws IOException 100 */ 101 @Private 102 public FileOutputCommitter(Path outputPath, 103 JobContext context) throws IOException { 104 Configuration conf = context.getConfiguration(); 105 algorithmVersion = 106 conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 107 FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT); 108 LOG.info("File Output Committer Algorithm version is " + algorithmVersion); 109 if (algorithmVersion != 1 && algorithmVersion != 2) { 110 throw new IOException("Only 1 or 2 algorithm version is supported"); 111 } 112 if (outputPath != null) { 113 FileSystem fs = outputPath.getFileSystem(context.getConfiguration()); 114 this.outputPath = fs.makeQualified(outputPath); 115 } 116 } 117 118 /** 119 * @return the path where final output of the job should be placed. This 120 * could also be considered the committed application attempt path. 121 */ 122 private Path getOutputPath() { 123 return this.outputPath; 124 } 125 126 /** 127 * @return true if we have an output path set, else false. 128 */ 129 private boolean hasOutputPath() { 130 return this.outputPath != null; 131 } 132 133 /** 134 * @return the path where the output of pending job attempts are 135 * stored. 136 */ 137 private Path getPendingJobAttemptsPath() { 138 return getPendingJobAttemptsPath(getOutputPath()); 139 } 140 141 /** 142 * Get the location of pending job attempts. 143 * @param out the base output directory. 144 * @return the location of pending job attempts. 145 */ 146 private static Path getPendingJobAttemptsPath(Path out) { 147 return new Path(out, PENDING_DIR_NAME); 148 } 149 150 /** 151 * Get the Application Attempt Id for this job 152 * @param context the context to look in 153 * @return the Application Attempt Id for a given job. 154 */ 155 private static int getAppAttemptId(JobContext context) { 156 return context.getConfiguration().getInt( 157 MRJobConfig.APPLICATION_ATTEMPT_ID, 0); 158 } 159 160 /** 161 * Compute the path where the output of a given job attempt will be placed. 162 * @param context the context of the job. This is used to get the 163 * application attempt id. 164 * @return the path to store job attempt data. 165 */ 166 public Path getJobAttemptPath(JobContext context) { 167 return getJobAttemptPath(context, getOutputPath()); 168 } 169 170 /** 171 * Compute the path where the output of a given job attempt will be placed. 172 * @param context the context of the job. This is used to get the 173 * application attempt id. 174 * @param out the output path to place these in. 175 * @return the path to store job attempt data. 176 */ 177 public static Path getJobAttemptPath(JobContext context, Path out) { 178 return getJobAttemptPath(getAppAttemptId(context), out); 179 } 180 181 /** 182 * Compute the path where the output of a given job attempt will be placed. 183 * @param appAttemptId the ID of the application attempt for this job. 184 * @return the path to store job attempt data. 185 */ 186 protected Path getJobAttemptPath(int appAttemptId) { 187 return getJobAttemptPath(appAttemptId, getOutputPath()); 188 } 189 190 /** 191 * Compute the path where the output of a given job attempt will be placed. 192 * @param appAttemptId the ID of the application attempt for this job. 193 * @return the path to store job attempt data. 194 */ 195 private static Path getJobAttemptPath(int appAttemptId, Path out) { 196 return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId)); 197 } 198 199 /** 200 * Compute the path where the output of pending task attempts are stored. 201 * @param context the context of the job with pending tasks. 202 * @return the path where the output of pending task attempts are stored. 203 */ 204 private Path getPendingTaskAttemptsPath(JobContext context) { 205 return getPendingTaskAttemptsPath(context, getOutputPath()); 206 } 207 208 /** 209 * Compute the path where the output of pending task attempts are stored. 210 * @param context the context of the job with pending tasks. 211 * @return the path where the output of pending task attempts are stored. 212 */ 213 private static Path getPendingTaskAttemptsPath(JobContext context, Path out) { 214 return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME); 215 } 216 217 /** 218 * Compute the path where the output of a task attempt is stored until 219 * that task is committed. 220 * 221 * @param context the context of the task attempt. 222 * @return the path where a task attempt should be stored. 223 */ 224 public Path getTaskAttemptPath(TaskAttemptContext context) { 225 return new Path(getPendingTaskAttemptsPath(context), 226 String.valueOf(context.getTaskAttemptID())); 227 } 228 229 /** 230 * Compute the path where the output of a task attempt is stored until 231 * that task is committed. 232 * 233 * @param context the context of the task attempt. 234 * @param out The output path to put things in. 235 * @return the path where a task attempt should be stored. 236 */ 237 public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) { 238 return new Path(getPendingTaskAttemptsPath(context, out), 239 String.valueOf(context.getTaskAttemptID())); 240 } 241 242 /** 243 * Compute the path where the output of a committed task is stored until 244 * the entire job is committed. 245 * @param context the context of the task attempt 246 * @return the path where the output of a committed task is stored until 247 * the entire job is committed. 248 */ 249 public Path getCommittedTaskPath(TaskAttemptContext context) { 250 return getCommittedTaskPath(getAppAttemptId(context), context); 251 } 252 253 public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) { 254 return getCommittedTaskPath(getAppAttemptId(context), context, out); 255 } 256 257 /** 258 * Compute the path where the output of a committed task is stored until the 259 * entire job is committed for a specific application attempt. 260 * @param appAttemptId the id of the application attempt to use 261 * @param context the context of any task. 262 * @return the path where the output of a committed task is stored. 263 */ 264 protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) { 265 return new Path(getJobAttemptPath(appAttemptId), 266 String.valueOf(context.getTaskAttemptID().getTaskID())); 267 } 268 269 private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) { 270 return new Path(getJobAttemptPath(appAttemptId, out), 271 String.valueOf(context.getTaskAttemptID().getTaskID())); 272 } 273 274 private static class CommittedTaskFilter implements PathFilter { 275 @Override 276 public boolean accept(Path path) { 277 return !PENDING_DIR_NAME.equals(path.getName()); 278 } 279 } 280 281 /** 282 * Get a list of all paths where output from committed tasks are stored. 283 * @param context the context of the current job 284 * @return the list of these Paths/FileStatuses. 285 * @throws IOException 286 */ 287 private FileStatus[] getAllCommittedTaskPaths(JobContext context) 288 throws IOException { 289 Path jobAttemptPath = getJobAttemptPath(context); 290 FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration()); 291 return fs.listStatus(jobAttemptPath, new CommittedTaskFilter()); 292 } 293 294 /** 295 * Get the directory that the task should write results into. 296 * @return the work directory 297 * @throws IOException 298 */ 299 public Path getWorkPath() throws IOException { 300 return workPath; 301 } 302 303 /** 304 * Create the temporary directory that is the root of all of the task 305 * work directories. 306 * @param context the job's context 307 */ 308 public void setupJob(JobContext context) throws IOException { 309 if (hasOutputPath()) { 310 Path jobAttemptPath = getJobAttemptPath(context); 311 FileSystem fs = jobAttemptPath.getFileSystem( 312 context.getConfiguration()); 313 if (!fs.mkdirs(jobAttemptPath)) { 314 LOG.error("Mkdirs failed to create " + jobAttemptPath); 315 } 316 } else { 317 LOG.warn("Output Path is null in setupJob()"); 318 } 319 } 320 321 /** 322 * The job has completed, so do works in commitJobInternal(). 323 * Could retry on failure if using algorithm 2. 324 * @param context the job's context 325 */ 326 public void commitJob(JobContext context) throws IOException { 327 int maxAttemptsOnFailure = isCommitJobRepeatable(context) ? 328 context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS, 329 FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT) : 1; 330 int attempt = 0; 331 boolean jobCommitNotFinished = true; 332 while (jobCommitNotFinished) { 333 try { 334 commitJobInternal(context); 335 jobCommitNotFinished = false; 336 } catch (Exception e) { 337 if (++attempt >= maxAttemptsOnFailure) { 338 throw e; 339 } else { 340 LOG.warn("Exception get thrown in job commit, retry (" + attempt + 341 ") time.", e); 342 } 343 } 344 } 345 } 346 347 /** 348 * The job has completed, so do following commit job, include: 349 * Move all committed tasks to the final output dir (algorithm 1 only). 350 * Delete the temporary directory, including all of the work directories. 351 * Create a _SUCCESS file to make it as successful. 352 * @param context the job's context 353 */ 354 @VisibleForTesting 355 protected void commitJobInternal(JobContext context) throws IOException { 356 if (hasOutputPath()) { 357 Path finalOutput = getOutputPath(); 358 FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); 359 360 if (algorithmVersion == 1) { 361 for (FileStatus stat: getAllCommittedTaskPaths(context)) { 362 mergePaths(fs, stat, finalOutput); 363 } 364 } 365 366 // delete the _temporary folder and create a _done file in the o/p folder 367 cleanupJob(context); 368 // True if the job requires output.dir marked on successful job. 369 // Note that by default it is set to true. 370 if (context.getConfiguration().getBoolean( 371 SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { 372 Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME); 373 // If job commit is repeatable and previous/another AM could write 374 // mark file already, we need to set overwritten to be true explicitly 375 // in case other FS implementations don't overwritten by default. 376 if (isCommitJobRepeatable(context)) { 377 fs.create(markerPath, true).close(); 378 } else { 379 fs.create(markerPath).close(); 380 } 381 } 382 } else { 383 LOG.warn("Output Path is null in commitJob()"); 384 } 385 } 386 387 /** 388 * Merge two paths together. Anything in from will be moved into to, if there 389 * are any name conflicts while merging the files or directories in from win. 390 * @param fs the File System to use 391 * @param from the path data is coming from. 392 * @param to the path data is going to. 393 * @throws IOException on any error 394 */ 395 private void mergePaths(FileSystem fs, final FileStatus from, 396 final Path to) throws IOException { 397 if (LOG.isDebugEnabled()) { 398 LOG.debug("Merging data from " + from + " to " + to); 399 } 400 FileStatus toStat; 401 try { 402 toStat = fs.getFileStatus(to); 403 } catch (FileNotFoundException fnfe) { 404 toStat = null; 405 } 406 407 if (from.isFile()) { 408 if (toStat != null) { 409 if (!fs.delete(to, true)) { 410 throw new IOException("Failed to delete " + to); 411 } 412 } 413 414 if (!fs.rename(from.getPath(), to)) { 415 throw new IOException("Failed to rename " + from + " to " + to); 416 } 417 } else if (from.isDirectory()) { 418 if (toStat != null) { 419 if (!toStat.isDirectory()) { 420 if (!fs.delete(to, true)) { 421 throw new IOException("Failed to delete " + to); 422 } 423 renameOrMerge(fs, from, to); 424 } else { 425 //It is a directory so merge everything in the directories 426 for (FileStatus subFrom : fs.listStatus(from.getPath())) { 427 Path subTo = new Path(to, subFrom.getPath().getName()); 428 mergePaths(fs, subFrom, subTo); 429 } 430 } 431 } else { 432 renameOrMerge(fs, from, to); 433 } 434 } 435 } 436 437 private void renameOrMerge(FileSystem fs, FileStatus from, Path to) 438 throws IOException { 439 if (algorithmVersion == 1) { 440 if (!fs.rename(from.getPath(), to)) { 441 throw new IOException("Failed to rename " + from + " to " + to); 442 } 443 } else { 444 fs.mkdirs(to); 445 for (FileStatus subFrom : fs.listStatus(from.getPath())) { 446 Path subTo = new Path(to, subFrom.getPath().getName()); 447 mergePaths(fs, subFrom, subTo); 448 } 449 } 450 } 451 452 @Override 453 @Deprecated 454 public void cleanupJob(JobContext context) throws IOException { 455 if (hasOutputPath()) { 456 Path pendingJobAttemptsPath = getPendingJobAttemptsPath(); 457 FileSystem fs = pendingJobAttemptsPath 458 .getFileSystem(context.getConfiguration()); 459 // if job allow repeatable commit and pendingJobAttemptsPath could be 460 // deleted by previous AM, we should tolerate FileNotFoundException in 461 // this case. 462 try { 463 fs.delete(pendingJobAttemptsPath, true); 464 } catch (FileNotFoundException e) { 465 if (!isCommitJobRepeatable(context)) { 466 throw e; 467 } 468 } 469 } else { 470 LOG.warn("Output Path is null in cleanupJob()"); 471 } 472 } 473 474 /** 475 * Delete the temporary directory, including all of the work directories. 476 * @param context the job's context 477 */ 478 @Override 479 public void abortJob(JobContext context, JobStatus.State state) 480 throws IOException { 481 // delete the _temporary folder 482 cleanupJob(context); 483 } 484 485 /** 486 * No task setup required. 487 */ 488 @Override 489 public void setupTask(TaskAttemptContext context) throws IOException { 490 // FileOutputCommitter's setupTask doesn't do anything. Because the 491 // temporary task directory is created on demand when the 492 // task is writing. 493 } 494 495 /** 496 * Move the files from the work directory to the job output directory 497 * @param context the task context 498 */ 499 @Override 500 public void commitTask(TaskAttemptContext context) 501 throws IOException { 502 commitTask(context, null); 503 } 504 505 @Private 506 public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 507 throws IOException { 508 509 TaskAttemptID attemptId = context.getTaskAttemptID(); 510 if (hasOutputPath()) { 511 context.progress(); 512 if(taskAttemptPath == null) { 513 taskAttemptPath = getTaskAttemptPath(context); 514 } 515 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 516 FileStatus taskAttemptDirStatus; 517 try { 518 taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath); 519 } catch (FileNotFoundException e) { 520 taskAttemptDirStatus = null; 521 } 522 523 if (taskAttemptDirStatus != null) { 524 if (algorithmVersion == 1) { 525 Path committedTaskPath = getCommittedTaskPath(context); 526 if (fs.exists(committedTaskPath)) { 527 if (!fs.delete(committedTaskPath, true)) { 528 throw new IOException("Could not delete " + committedTaskPath); 529 } 530 } 531 if (!fs.rename(taskAttemptPath, committedTaskPath)) { 532 throw new IOException("Could not rename " + taskAttemptPath + " to " 533 + committedTaskPath); 534 } 535 LOG.info("Saved output of task '" + attemptId + "' to " + 536 committedTaskPath); 537 } else { 538 // directly merge everything from taskAttemptPath to output directory 539 mergePaths(fs, taskAttemptDirStatus, outputPath); 540 LOG.info("Saved output of task '" + attemptId + "' to " + 541 outputPath); 542 } 543 } else { 544 LOG.warn("No Output found for " + attemptId); 545 } 546 } else { 547 LOG.warn("Output Path is null in commitTask()"); 548 } 549 } 550 551 /** 552 * Delete the work directory 553 * @throws IOException 554 */ 555 @Override 556 public void abortTask(TaskAttemptContext context) throws IOException { 557 abortTask(context, null); 558 } 559 560 @Private 561 public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException { 562 if (hasOutputPath()) { 563 context.progress(); 564 if(taskAttemptPath == null) { 565 taskAttemptPath = getTaskAttemptPath(context); 566 } 567 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 568 if(!fs.delete(taskAttemptPath, true)) { 569 LOG.warn("Could not delete "+taskAttemptPath); 570 } 571 } else { 572 LOG.warn("Output Path is null in abortTask()"); 573 } 574 } 575 576 /** 577 * Did this task write any files in the work directory? 578 * @param context the task's context 579 */ 580 @Override 581 public boolean needsTaskCommit(TaskAttemptContext context 582 ) throws IOException { 583 return needsTaskCommit(context, null); 584 } 585 586 @Private 587 public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath 588 ) throws IOException { 589 if(hasOutputPath()) { 590 if(taskAttemptPath == null) { 591 taskAttemptPath = getTaskAttemptPath(context); 592 } 593 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 594 return fs.exists(taskAttemptPath); 595 } 596 return false; 597 } 598 599 @Override 600 @Deprecated 601 public boolean isRecoverySupported() { 602 return true; 603 } 604 605 @Override 606 public boolean isCommitJobRepeatable(JobContext context) throws IOException { 607 return algorithmVersion == 2; 608 } 609 610 @Override 611 public void recoverTask(TaskAttemptContext context) 612 throws IOException { 613 if(hasOutputPath()) { 614 context.progress(); 615 TaskAttemptID attemptId = context.getTaskAttemptID(); 616 int previousAttempt = getAppAttemptId(context) - 1; 617 if (previousAttempt < 0) { 618 throw new IOException ("Cannot recover task output for first attempt..."); 619 } 620 621 Path previousCommittedTaskPath = getCommittedTaskPath( 622 previousAttempt, context); 623 FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration()); 624 if (LOG.isDebugEnabled()) { 625 LOG.debug("Trying to recover task from " + previousCommittedTaskPath); 626 } 627 if (algorithmVersion == 1) { 628 if (fs.exists(previousCommittedTaskPath)) { 629 Path committedTaskPath = getCommittedTaskPath(context); 630 if (fs.exists(committedTaskPath)) { 631 if (!fs.delete(committedTaskPath, true)) { 632 throw new IOException("Could not delete "+committedTaskPath); 633 } 634 } 635 //Rename can fail if the parent directory does not yet exist. 636 Path committedParent = committedTaskPath.getParent(); 637 fs.mkdirs(committedParent); 638 if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) { 639 throw new IOException("Could not rename " + previousCommittedTaskPath + 640 " to " + committedTaskPath); 641 } 642 } else { 643 LOG.warn(attemptId+" had no output to recover."); 644 } 645 } else { 646 // essentially a no-op, but for backwards compatibility 647 // after upgrade to the new fileOutputCommitter, 648 // check if there are any output left in committedTaskPath 649 if (fs.exists(previousCommittedTaskPath)) { 650 LOG.info("Recovering task for upgrading scenario, moving files from " 651 + previousCommittedTaskPath + " to " + outputPath); 652 FileStatus from = fs.getFileStatus(previousCommittedTaskPath); 653 mergePaths(fs, from, outputPath); 654 } 655 LOG.info("Done recovering task " + attemptId); 656 } 657 } else { 658 LOG.warn("Output Path is null in recoverTask()"); 659 } 660 } 661}