001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.output; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceAudience.Private; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.fs.PathFilter; 034import org.apache.hadoop.mapreduce.JobContext; 035import org.apache.hadoop.mapreduce.JobStatus; 036import org.apache.hadoop.mapreduce.MRJobConfig; 037import org.apache.hadoop.mapreduce.OutputCommitter; 038import org.apache.hadoop.mapreduce.TaskAttemptContext; 039import org.apache.hadoop.mapreduce.TaskAttemptID; 040 041import com.google.common.annotations.VisibleForTesting; 042 043/** An {@link OutputCommitter} that commits files specified 044 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. 045 **/ 046@InterfaceAudience.Public 047@InterfaceStability.Stable 048public class FileOutputCommitter extends OutputCommitter { 049 private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class); 050 051 /** 052 * Name of directory where pending data is placed. Data that has not been 053 * committed yet. 054 */ 055 public static final String PENDING_DIR_NAME = "_temporary"; 056 /** 057 * Temporary directory name 058 * 059 * The static variable to be compatible with M/R 1.x 060 */ 061 @Deprecated 062 protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME; 063 public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; 064 public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 065 "mapreduce.fileoutputcommitter.marksuccessfuljobs"; 066 public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = 067 "mapreduce.fileoutputcommitter.algorithm.version"; 068 public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1; 069 // Skip cleanup _temporary folders under job's output directory 070 public static final String FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED = 071 "mapreduce.fileoutputcommitter.cleanup.skipped"; 072 public static final boolean 073 FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT = false; 074 075 // Ignore exceptions in cleanup _temporary folder under job's output directory 076 public static final String FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED = 077 "mapreduce.fileoutputcommitter.cleanup-failures.ignored"; 078 public static final boolean 079 FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT = false; 080 081 // Number of attempts when failure happens in commit job 082 public static final String FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS = 083 "mapreduce.fileoutputcommitter.failures.attempts"; 084 085 // default value to be 1 to keep consistent with previous behavior 086 public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1; 087 088 private Path outputPath = null; 089 private Path workPath = null; 090 private final int algorithmVersion; 091 private final boolean skipCleanup; 092 private final boolean ignoreCleanupFailures; 093 094 /** 095 * Create a file output committer 096 * @param outputPath the job's output path, or null if you want the output 097 * committer to act as a noop. 098 * @param context the task's context 099 * @throws IOException 100 */ 101 public FileOutputCommitter(Path outputPath, 102 TaskAttemptContext context) throws IOException { 103 this(outputPath, (JobContext)context); 104 if (outputPath != null) { 105 workPath = getTaskAttemptPath(context, outputPath); 106 } 107 } 108 109 /** 110 * Create a file output committer 111 * @param outputPath the job's output path, or null if you want the output 112 * committer to act as a noop. 113 * @param context the task's context 114 * @throws IOException 115 */ 116 @Private 117 public FileOutputCommitter(Path outputPath, 118 JobContext context) throws IOException { 119 Configuration conf = context.getConfiguration(); 120 algorithmVersion = 121 conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 122 FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT); 123 LOG.info("File Output Committer Algorithm version is " + algorithmVersion); 124 if (algorithmVersion != 1 && algorithmVersion != 2) { 125 throw new IOException("Only 1 or 2 algorithm version is supported"); 126 } 127 128 // if skip cleanup 129 skipCleanup = conf.getBoolean( 130 FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED, 131 FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT); 132 133 // if ignore failures in cleanup 134 ignoreCleanupFailures = conf.getBoolean( 135 FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED, 136 FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT); 137 138 LOG.info("FileOutputCommitter skip cleanup _temporary folders under " + 139 "output directory:" + skipCleanup + ", ignore cleanup failures: " + 140 ignoreCleanupFailures); 141 142 if (outputPath != null) { 143 FileSystem fs = outputPath.getFileSystem(context.getConfiguration()); 144 this.outputPath = fs.makeQualified(outputPath); 145 } 146 } 147 148 /** 149 * @return the path where final output of the job should be placed. This 150 * could also be considered the committed application attempt path. 151 */ 152 private Path getOutputPath() { 153 return this.outputPath; 154 } 155 156 /** 157 * @return true if we have an output path set, else false. 158 */ 159 private boolean hasOutputPath() { 160 return this.outputPath != null; 161 } 162 163 /** 164 * @return the path where the output of pending job attempts are 165 * stored. 166 */ 167 private Path getPendingJobAttemptsPath() { 168 return getPendingJobAttemptsPath(getOutputPath()); 169 } 170 171 /** 172 * Get the location of pending job attempts. 173 * @param out the base output directory. 174 * @return the location of pending job attempts. 175 */ 176 private static Path getPendingJobAttemptsPath(Path out) { 177 return new Path(out, PENDING_DIR_NAME); 178 } 179 180 /** 181 * Get the Application Attempt Id for this job 182 * @param context the context to look in 183 * @return the Application Attempt Id for a given job. 184 */ 185 private static int getAppAttemptId(JobContext context) { 186 return context.getConfiguration().getInt( 187 MRJobConfig.APPLICATION_ATTEMPT_ID, 0); 188 } 189 190 /** 191 * Compute the path where the output of a given job attempt will be placed. 192 * @param context the context of the job. This is used to get the 193 * application attempt id. 194 * @return the path to store job attempt data. 195 */ 196 public Path getJobAttemptPath(JobContext context) { 197 return getJobAttemptPath(context, getOutputPath()); 198 } 199 200 /** 201 * Compute the path where the output of a given job attempt will be placed. 202 * @param context the context of the job. This is used to get the 203 * application attempt id. 204 * @param out the output path to place these in. 205 * @return the path to store job attempt data. 206 */ 207 public static Path getJobAttemptPath(JobContext context, Path out) { 208 return getJobAttemptPath(getAppAttemptId(context), out); 209 } 210 211 /** 212 * Compute the path where the output of a given job attempt will be placed. 213 * @param appAttemptId the ID of the application attempt for this job. 214 * @return the path to store job attempt data. 215 */ 216 protected Path getJobAttemptPath(int appAttemptId) { 217 return getJobAttemptPath(appAttemptId, getOutputPath()); 218 } 219 220 /** 221 * Compute the path where the output of a given job attempt will be placed. 222 * @param appAttemptId the ID of the application attempt for this job. 223 * @return the path to store job attempt data. 224 */ 225 private static Path getJobAttemptPath(int appAttemptId, Path out) { 226 return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId)); 227 } 228 229 /** 230 * Compute the path where the output of pending task attempts are stored. 231 * @param context the context of the job with pending tasks. 232 * @return the path where the output of pending task attempts are stored. 233 */ 234 private Path getPendingTaskAttemptsPath(JobContext context) { 235 return getPendingTaskAttemptsPath(context, getOutputPath()); 236 } 237 238 /** 239 * Compute the path where the output of pending task attempts are stored. 240 * @param context the context of the job with pending tasks. 241 * @return the path where the output of pending task attempts are stored. 242 */ 243 private static Path getPendingTaskAttemptsPath(JobContext context, Path out) { 244 return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME); 245 } 246 247 /** 248 * Compute the path where the output of a task attempt is stored until 249 * that task is committed. 250 * 251 * @param context the context of the task attempt. 252 * @return the path where a task attempt should be stored. 253 */ 254 public Path getTaskAttemptPath(TaskAttemptContext context) { 255 return new Path(getPendingTaskAttemptsPath(context), 256 String.valueOf(context.getTaskAttemptID())); 257 } 258 259 /** 260 * Compute the path where the output of a task attempt is stored until 261 * that task is committed. 262 * 263 * @param context the context of the task attempt. 264 * @param out The output path to put things in. 265 * @return the path where a task attempt should be stored. 266 */ 267 public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) { 268 return new Path(getPendingTaskAttemptsPath(context, out), 269 String.valueOf(context.getTaskAttemptID())); 270 } 271 272 /** 273 * Compute the path where the output of a committed task is stored until 274 * the entire job is committed. 275 * @param context the context of the task attempt 276 * @return the path where the output of a committed task is stored until 277 * the entire job is committed. 278 */ 279 public Path getCommittedTaskPath(TaskAttemptContext context) { 280 return getCommittedTaskPath(getAppAttemptId(context), context); 281 } 282 283 public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) { 284 return getCommittedTaskPath(getAppAttemptId(context), context, out); 285 } 286 287 /** 288 * Compute the path where the output of a committed task is stored until the 289 * entire job is committed for a specific application attempt. 290 * @param appAttemptId the id of the application attempt to use 291 * @param context the context of any task. 292 * @return the path where the output of a committed task is stored. 293 */ 294 protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) { 295 return new Path(getJobAttemptPath(appAttemptId), 296 String.valueOf(context.getTaskAttemptID().getTaskID())); 297 } 298 299 private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) { 300 return new Path(getJobAttemptPath(appAttemptId, out), 301 String.valueOf(context.getTaskAttemptID().getTaskID())); 302 } 303 304 private static class CommittedTaskFilter implements PathFilter { 305 @Override 306 public boolean accept(Path path) { 307 return !PENDING_DIR_NAME.equals(path.getName()); 308 } 309 } 310 311 /** 312 * Get a list of all paths where output from committed tasks are stored. 313 * @param context the context of the current job 314 * @return the list of these Paths/FileStatuses. 315 * @throws IOException 316 */ 317 private FileStatus[] getAllCommittedTaskPaths(JobContext context) 318 throws IOException { 319 Path jobAttemptPath = getJobAttemptPath(context); 320 FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration()); 321 return fs.listStatus(jobAttemptPath, new CommittedTaskFilter()); 322 } 323 324 /** 325 * Get the directory that the task should write results into. 326 * @return the work directory 327 * @throws IOException 328 */ 329 public Path getWorkPath() throws IOException { 330 return workPath; 331 } 332 333 /** 334 * Create the temporary directory that is the root of all of the task 335 * work directories. 336 * @param context the job's context 337 */ 338 public void setupJob(JobContext context) throws IOException { 339 if (hasOutputPath()) { 340 Path jobAttemptPath = getJobAttemptPath(context); 341 FileSystem fs = jobAttemptPath.getFileSystem( 342 context.getConfiguration()); 343 if (!fs.mkdirs(jobAttemptPath)) { 344 LOG.error("Mkdirs failed to create " + jobAttemptPath); 345 } 346 } else { 347 LOG.warn("Output Path is null in setupJob()"); 348 } 349 } 350 351 /** 352 * The job has completed, so do works in commitJobInternal(). 353 * Could retry on failure if using algorithm 2. 354 * @param context the job's context 355 */ 356 public void commitJob(JobContext context) throws IOException { 357 int maxAttemptsOnFailure = isCommitJobRepeatable(context) ? 358 context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS, 359 FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT) : 1; 360 int attempt = 0; 361 boolean jobCommitNotFinished = true; 362 while (jobCommitNotFinished) { 363 try { 364 commitJobInternal(context); 365 jobCommitNotFinished = false; 366 } catch (Exception e) { 367 if (++attempt >= maxAttemptsOnFailure) { 368 throw e; 369 } else { 370 LOG.warn("Exception get thrown in job commit, retry (" + attempt + 371 ") time.", e); 372 } 373 } 374 } 375 } 376 377 /** 378 * The job has completed, so do following commit job, include: 379 * Move all committed tasks to the final output dir (algorithm 1 only). 380 * Delete the temporary directory, including all of the work directories. 381 * Create a _SUCCESS file to make it as successful. 382 * @param context the job's context 383 */ 384 @VisibleForTesting 385 protected void commitJobInternal(JobContext context) throws IOException { 386 if (hasOutputPath()) { 387 Path finalOutput = getOutputPath(); 388 FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); 389 390 if (algorithmVersion == 1) { 391 for (FileStatus stat: getAllCommittedTaskPaths(context)) { 392 mergePaths(fs, stat, finalOutput); 393 } 394 } 395 396 if (skipCleanup) { 397 LOG.info("Skip cleanup the _temporary folders under job's output " + 398 "directory in commitJob."); 399 } else { 400 // delete the _temporary folder and create a _done file in the o/p 401 // folder 402 try { 403 cleanupJob(context); 404 } catch (IOException e) { 405 if (ignoreCleanupFailures) { 406 // swallow exceptions in cleanup as user configure to make sure 407 // commitJob could be success even when cleanup get failure. 408 LOG.error("Error in cleanup job, manually cleanup is needed.", e); 409 } else { 410 // throw back exception to fail commitJob. 411 throw e; 412 } 413 } 414 } 415 // True if the job requires output.dir marked on successful job. 416 // Note that by default it is set to true. 417 if (context.getConfiguration().getBoolean( 418 SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { 419 Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME); 420 // If job commit is repeatable and previous/another AM could write 421 // mark file already, we need to set overwritten to be true explicitly 422 // in case other FS implementations don't overwritten by default. 423 if (isCommitJobRepeatable(context)) { 424 fs.create(markerPath, true).close(); 425 } else { 426 fs.create(markerPath).close(); 427 } 428 } 429 } else { 430 LOG.warn("Output Path is null in commitJob()"); 431 } 432 } 433 434 /** 435 * Merge two paths together. Anything in from will be moved into to, if there 436 * are any name conflicts while merging the files or directories in from win. 437 * @param fs the File System to use 438 * @param from the path data is coming from. 439 * @param to the path data is going to. 440 * @throws IOException on any error 441 */ 442 private void mergePaths(FileSystem fs, final FileStatus from, 443 final Path to) throws IOException { 444 if (LOG.isDebugEnabled()) { 445 LOG.debug("Merging data from " + from + " to " + to); 446 } 447 FileStatus toStat; 448 try { 449 toStat = fs.getFileStatus(to); 450 } catch (FileNotFoundException fnfe) { 451 toStat = null; 452 } 453 454 if (from.isFile()) { 455 if (toStat != null) { 456 if (!fs.delete(to, true)) { 457 throw new IOException("Failed to delete " + to); 458 } 459 } 460 461 if (!fs.rename(from.getPath(), to)) { 462 throw new IOException("Failed to rename " + from + " to " + to); 463 } 464 } else if (from.isDirectory()) { 465 if (toStat != null) { 466 if (!toStat.isDirectory()) { 467 if (!fs.delete(to, true)) { 468 throw new IOException("Failed to delete " + to); 469 } 470 renameOrMerge(fs, from, to); 471 } else { 472 //It is a directory so merge everything in the directories 473 for (FileStatus subFrom : fs.listStatus(from.getPath())) { 474 Path subTo = new Path(to, subFrom.getPath().getName()); 475 mergePaths(fs, subFrom, subTo); 476 } 477 } 478 } else { 479 renameOrMerge(fs, from, to); 480 } 481 } 482 } 483 484 private void renameOrMerge(FileSystem fs, FileStatus from, Path to) 485 throws IOException { 486 if (algorithmVersion == 1) { 487 if (!fs.rename(from.getPath(), to)) { 488 throw new IOException("Failed to rename " + from + " to " + to); 489 } 490 } else { 491 fs.mkdirs(to); 492 for (FileStatus subFrom : fs.listStatus(from.getPath())) { 493 Path subTo = new Path(to, subFrom.getPath().getName()); 494 mergePaths(fs, subFrom, subTo); 495 } 496 } 497 } 498 499 @Override 500 @Deprecated 501 public void cleanupJob(JobContext context) throws IOException { 502 if (hasOutputPath()) { 503 Path pendingJobAttemptsPath = getPendingJobAttemptsPath(); 504 FileSystem fs = pendingJobAttemptsPath 505 .getFileSystem(context.getConfiguration()); 506 // if job allow repeatable commit and pendingJobAttemptsPath could be 507 // deleted by previous AM, we should tolerate FileNotFoundException in 508 // this case. 509 try { 510 fs.delete(pendingJobAttemptsPath, true); 511 } catch (FileNotFoundException e) { 512 if (!isCommitJobRepeatable(context)) { 513 throw e; 514 } 515 } 516 } else { 517 LOG.warn("Output Path is null in cleanupJob()"); 518 } 519 } 520 521 /** 522 * Delete the temporary directory, including all of the work directories. 523 * @param context the job's context 524 */ 525 @Override 526 public void abortJob(JobContext context, JobStatus.State state) 527 throws IOException { 528 // delete the _temporary folder 529 cleanupJob(context); 530 } 531 532 /** 533 * No task setup required. 534 */ 535 @Override 536 public void setupTask(TaskAttemptContext context) throws IOException { 537 // FileOutputCommitter's setupTask doesn't do anything. Because the 538 // temporary task directory is created on demand when the 539 // task is writing. 540 } 541 542 /** 543 * Move the files from the work directory to the job output directory 544 * @param context the task context 545 */ 546 @Override 547 public void commitTask(TaskAttemptContext context) 548 throws IOException { 549 commitTask(context, null); 550 } 551 552 @Private 553 public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 554 throws IOException { 555 556 TaskAttemptID attemptId = context.getTaskAttemptID(); 557 if (hasOutputPath()) { 558 context.progress(); 559 if(taskAttemptPath == null) { 560 taskAttemptPath = getTaskAttemptPath(context); 561 } 562 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 563 FileStatus taskAttemptDirStatus; 564 try { 565 taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath); 566 } catch (FileNotFoundException e) { 567 taskAttemptDirStatus = null; 568 } 569 570 if (taskAttemptDirStatus != null) { 571 if (algorithmVersion == 1) { 572 Path committedTaskPath = getCommittedTaskPath(context); 573 if (fs.exists(committedTaskPath)) { 574 if (!fs.delete(committedTaskPath, true)) { 575 throw new IOException("Could not delete " + committedTaskPath); 576 } 577 } 578 if (!fs.rename(taskAttemptPath, committedTaskPath)) { 579 throw new IOException("Could not rename " + taskAttemptPath + " to " 580 + committedTaskPath); 581 } 582 LOG.info("Saved output of task '" + attemptId + "' to " + 583 committedTaskPath); 584 } else { 585 // directly merge everything from taskAttemptPath to output directory 586 mergePaths(fs, taskAttemptDirStatus, outputPath); 587 LOG.info("Saved output of task '" + attemptId + "' to " + 588 outputPath); 589 } 590 } else { 591 LOG.warn("No Output found for " + attemptId); 592 } 593 } else { 594 LOG.warn("Output Path is null in commitTask()"); 595 } 596 } 597 598 /** 599 * Delete the work directory 600 * @throws IOException 601 */ 602 @Override 603 public void abortTask(TaskAttemptContext context) throws IOException { 604 abortTask(context, null); 605 } 606 607 @Private 608 public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException { 609 if (hasOutputPath()) { 610 context.progress(); 611 if(taskAttemptPath == null) { 612 taskAttemptPath = getTaskAttemptPath(context); 613 } 614 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 615 if(!fs.delete(taskAttemptPath, true)) { 616 LOG.warn("Could not delete "+taskAttemptPath); 617 } 618 } else { 619 LOG.warn("Output Path is null in abortTask()"); 620 } 621 } 622 623 /** 624 * Did this task write any files in the work directory? 625 * @param context the task's context 626 */ 627 @Override 628 public boolean needsTaskCommit(TaskAttemptContext context 629 ) throws IOException { 630 return needsTaskCommit(context, null); 631 } 632 633 @Private 634 public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath 635 ) throws IOException { 636 if(hasOutputPath()) { 637 if(taskAttemptPath == null) { 638 taskAttemptPath = getTaskAttemptPath(context); 639 } 640 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 641 return fs.exists(taskAttemptPath); 642 } 643 return false; 644 } 645 646 @Override 647 @Deprecated 648 public boolean isRecoverySupported() { 649 return true; 650 } 651 652 @Override 653 public boolean isCommitJobRepeatable(JobContext context) throws IOException { 654 return algorithmVersion == 2; 655 } 656 657 @Override 658 public void recoverTask(TaskAttemptContext context) 659 throws IOException { 660 if(hasOutputPath()) { 661 context.progress(); 662 TaskAttemptID attemptId = context.getTaskAttemptID(); 663 int previousAttempt = getAppAttemptId(context) - 1; 664 if (previousAttempt < 0) { 665 throw new IOException ("Cannot recover task output for first attempt..."); 666 } 667 668 Path previousCommittedTaskPath = getCommittedTaskPath( 669 previousAttempt, context); 670 FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration()); 671 if (LOG.isDebugEnabled()) { 672 LOG.debug("Trying to recover task from " + previousCommittedTaskPath); 673 } 674 if (algorithmVersion == 1) { 675 if (fs.exists(previousCommittedTaskPath)) { 676 Path committedTaskPath = getCommittedTaskPath(context); 677 if (fs.exists(committedTaskPath)) { 678 if (!fs.delete(committedTaskPath, true)) { 679 throw new IOException("Could not delete "+committedTaskPath); 680 } 681 } 682 //Rename can fail if the parent directory does not yet exist. 683 Path committedParent = committedTaskPath.getParent(); 684 fs.mkdirs(committedParent); 685 if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) { 686 throw new IOException("Could not rename " + previousCommittedTaskPath + 687 " to " + committedTaskPath); 688 } 689 } else { 690 LOG.warn(attemptId+" had no output to recover."); 691 } 692 } else { 693 // essentially a no-op, but for backwards compatibility 694 // after upgrade to the new fileOutputCommitter, 695 // check if there are any output left in committedTaskPath 696 if (fs.exists(previousCommittedTaskPath)) { 697 LOG.info("Recovering task for upgrading scenario, moving files from " 698 + previousCommittedTaskPath + " to " + outputPath); 699 FileStatus from = fs.getFileStatus(previousCommittedTaskPath); 700 mergePaths(fs, from, outputPath); 701 } 702 LOG.info("Done recovering task " + attemptId); 703 } 704 } else { 705 LOG.warn("Output Path is null in recoverTask()"); 706 } 707 } 708}