001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.output;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceAudience.Private;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.fs.PathFilter;
034import org.apache.hadoop.mapreduce.JobContext;
035import org.apache.hadoop.mapreduce.JobStatus;
036import org.apache.hadoop.mapreduce.MRJobConfig;
037import org.apache.hadoop.mapreduce.OutputCommitter;
038import org.apache.hadoop.mapreduce.TaskAttemptContext;
039import org.apache.hadoop.mapreduce.TaskAttemptID;
040
041import com.google.common.annotations.VisibleForTesting;
042
043/** An {@link OutputCommitter} that commits files specified 
044 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
045 **/
046@InterfaceAudience.Public
047@InterfaceStability.Stable
048public class FileOutputCommitter extends OutputCommitter {
049  private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
050
051  /** 
052   * Name of directory where pending data is placed.  Data that has not been
053   * committed yet.
054   */
055  public static final String PENDING_DIR_NAME = "_temporary";
056  /**
057   * Temporary directory name 
058   *
059   * The static variable to be compatible with M/R 1.x
060   */
061  @Deprecated
062  protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME;
063  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
064  public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
065      "mapreduce.fileoutputcommitter.marksuccessfuljobs";
066  public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
067      "mapreduce.fileoutputcommitter.algorithm.version";
068  public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
069  // Skip cleanup _temporary folders under job's output directory
070  public static final String FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED =
071      "mapreduce.fileoutputcommitter.cleanup.skipped";
072  public static final boolean
073      FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT = false;
074
075  // Ignore exceptions in cleanup _temporary folder under job's output directory
076  public static final String FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED =
077      "mapreduce.fileoutputcommitter.cleanup-failures.ignored";
078  public static final boolean
079      FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT = false;
080
081  // Number of attempts when failure happens in commit job
082  public static final String FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS =
083      "mapreduce.fileoutputcommitter.failures.attempts";
084
085  // default value to be 1 to keep consistent with previous behavior
086  public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1;
087
088  private Path outputPath = null;
089  private Path workPath = null;
090  private final int algorithmVersion;
091  private final boolean skipCleanup;
092  private final boolean ignoreCleanupFailures;
093
094  /**
095   * Create a file output committer
096   * @param outputPath the job's output path, or null if you want the output
097   * committer to act as a noop.
098   * @param context the task's context
099   * @throws IOException
100   */
101  public FileOutputCommitter(Path outputPath, 
102                             TaskAttemptContext context) throws IOException {
103    this(outputPath, (JobContext)context);
104    if (outputPath != null) {
105      workPath = getTaskAttemptPath(context, outputPath);
106    }
107  }
108  
109  /**
110   * Create a file output committer
111   * @param outputPath the job's output path, or null if you want the output
112   * committer to act as a noop.
113   * @param context the task's context
114   * @throws IOException
115   */
116  @Private
117  public FileOutputCommitter(Path outputPath, 
118                             JobContext context) throws IOException {
119    Configuration conf = context.getConfiguration();
120    algorithmVersion =
121        conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
122                    FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT);
123    LOG.info("File Output Committer Algorithm version is " + algorithmVersion);
124    if (algorithmVersion != 1 && algorithmVersion != 2) {
125      throw new IOException("Only 1 or 2 algorithm version is supported");
126    }
127
128    // if skip cleanup
129    skipCleanup = conf.getBoolean(
130        FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED,
131        FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT);
132
133    // if ignore failures in cleanup
134    ignoreCleanupFailures = conf.getBoolean(
135        FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED,
136        FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT);
137
138    LOG.info("FileOutputCommitter skip cleanup _temporary folders under " +
139        "output directory:" + skipCleanup + ", ignore cleanup failures: " +
140        ignoreCleanupFailures);
141
142    if (outputPath != null) {
143      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
144      this.outputPath = fs.makeQualified(outputPath);
145    }
146  }
147  
148  /**
149   * @return the path where final output of the job should be placed.  This
150   * could also be considered the committed application attempt path.
151   */
152  private Path getOutputPath() {
153    return this.outputPath;
154  }
155  
156  /**
157   * @return true if we have an output path set, else false.
158   */
159  private boolean hasOutputPath() {
160    return this.outputPath != null;
161  }
162  
163  /**
164   * @return the path where the output of pending job attempts are
165   * stored.
166   */
167  private Path getPendingJobAttemptsPath() {
168    return getPendingJobAttemptsPath(getOutputPath());
169  }
170  
171  /**
172   * Get the location of pending job attempts.
173   * @param out the base output directory.
174   * @return the location of pending job attempts.
175   */
176  private static Path getPendingJobAttemptsPath(Path out) {
177    return new Path(out, PENDING_DIR_NAME);
178  }
179  
180  /**
181   * Get the Application Attempt Id for this job
182   * @param context the context to look in
183   * @return the Application Attempt Id for a given job.
184   */
185  private static int getAppAttemptId(JobContext context) {
186    return context.getConfiguration().getInt(
187        MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
188  }
189  
190  /**
191   * Compute the path where the output of a given job attempt will be placed. 
192   * @param context the context of the job.  This is used to get the
193   * application attempt id.
194   * @return the path to store job attempt data.
195   */
196  public Path getJobAttemptPath(JobContext context) {
197    return getJobAttemptPath(context, getOutputPath());
198  }
199  
200  /**
201   * Compute the path where the output of a given job attempt will be placed. 
202   * @param context the context of the job.  This is used to get the
203   * application attempt id.
204   * @param out the output path to place these in.
205   * @return the path to store job attempt data.
206   */
207  public static Path getJobAttemptPath(JobContext context, Path out) {
208    return getJobAttemptPath(getAppAttemptId(context), out);
209  }
210  
211  /**
212   * Compute the path where the output of a given job attempt will be placed. 
213   * @param appAttemptId the ID of the application attempt for this job.
214   * @return the path to store job attempt data.
215   */
216  protected Path getJobAttemptPath(int appAttemptId) {
217    return getJobAttemptPath(appAttemptId, getOutputPath());
218  }
219  
220  /**
221   * Compute the path where the output of a given job attempt will be placed. 
222   * @param appAttemptId the ID of the application attempt for this job.
223   * @return the path to store job attempt data.
224   */
225  private static Path getJobAttemptPath(int appAttemptId, Path out) {
226    return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
227  }
228  
229  /**
230   * Compute the path where the output of pending task attempts are stored.
231   * @param context the context of the job with pending tasks. 
232   * @return the path where the output of pending task attempts are stored.
233   */
234  private Path getPendingTaskAttemptsPath(JobContext context) {
235    return getPendingTaskAttemptsPath(context, getOutputPath());
236  }
237  
238  /**
239   * Compute the path where the output of pending task attempts are stored.
240   * @param context the context of the job with pending tasks. 
241   * @return the path where the output of pending task attempts are stored.
242   */
243  private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
244    return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
245  }
246  
247  /**
248   * Compute the path where the output of a task attempt is stored until
249   * that task is committed.
250   * 
251   * @param context the context of the task attempt.
252   * @return the path where a task attempt should be stored.
253   */
254  public Path getTaskAttemptPath(TaskAttemptContext context) {
255    return new Path(getPendingTaskAttemptsPath(context), 
256        String.valueOf(context.getTaskAttemptID()));
257  }
258  
259  /**
260   * Compute the path where the output of a task attempt is stored until
261   * that task is committed.
262   * 
263   * @param context the context of the task attempt.
264   * @param out The output path to put things in.
265   * @return the path where a task attempt should be stored.
266   */
267  public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
268    return new Path(getPendingTaskAttemptsPath(context, out), 
269        String.valueOf(context.getTaskAttemptID()));
270  }
271  
272  /**
273   * Compute the path where the output of a committed task is stored until
274   * the entire job is committed.
275   * @param context the context of the task attempt
276   * @return the path where the output of a committed task is stored until
277   * the entire job is committed.
278   */
279  public Path getCommittedTaskPath(TaskAttemptContext context) {
280    return getCommittedTaskPath(getAppAttemptId(context), context);
281  }
282  
283  public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
284    return getCommittedTaskPath(getAppAttemptId(context), context, out);
285  }
286  
287  /**
288   * Compute the path where the output of a committed task is stored until the
289   * entire job is committed for a specific application attempt.
290   * @param appAttemptId the id of the application attempt to use
291   * @param context the context of any task.
292   * @return the path where the output of a committed task is stored.
293   */
294  protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
295    return new Path(getJobAttemptPath(appAttemptId),
296        String.valueOf(context.getTaskAttemptID().getTaskID()));
297  }
298  
299  private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
300    return new Path(getJobAttemptPath(appAttemptId, out),
301        String.valueOf(context.getTaskAttemptID().getTaskID()));
302  }
303
304  private static class CommittedTaskFilter implements PathFilter {
305    @Override
306    public boolean accept(Path path) {
307      return !PENDING_DIR_NAME.equals(path.getName());
308    }
309  }
310
311  /**
312   * Get a list of all paths where output from committed tasks are stored.
313   * @param context the context of the current job
314   * @return the list of these Paths/FileStatuses. 
315   * @throws IOException
316   */
317  private FileStatus[] getAllCommittedTaskPaths(JobContext context) 
318    throws IOException {
319    Path jobAttemptPath = getJobAttemptPath(context);
320    FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
321    return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
322  }
323
324  /**
325   * Get the directory that the task should write results into.
326   * @return the work directory
327   * @throws IOException
328   */
329  public Path getWorkPath() throws IOException {
330    return workPath;
331  }
332
333  /**
334   * Create the temporary directory that is the root of all of the task 
335   * work directories.
336   * @param context the job's context
337   */
338  public void setupJob(JobContext context) throws IOException {
339    if (hasOutputPath()) {
340      Path jobAttemptPath = getJobAttemptPath(context);
341      FileSystem fs = jobAttemptPath.getFileSystem(
342          context.getConfiguration());
343      if (!fs.mkdirs(jobAttemptPath)) {
344        LOG.error("Mkdirs failed to create " + jobAttemptPath);
345      }
346    } else {
347      LOG.warn("Output Path is null in setupJob()");
348    }
349  }
350
351  /**
352   * The job has completed, so do works in commitJobInternal().
353   * Could retry on failure if using algorithm 2.
354   * @param context the job's context
355   */
356  public void commitJob(JobContext context) throws IOException {
357    int maxAttemptsOnFailure = isCommitJobRepeatable(context) ?
358        context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
359            FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT) : 1;
360    int attempt = 0;
361    boolean jobCommitNotFinished = true;
362    while (jobCommitNotFinished) {
363      try {
364        commitJobInternal(context);
365        jobCommitNotFinished = false;
366      } catch (Exception e) {
367        if (++attempt >= maxAttemptsOnFailure) {
368          throw e;
369        } else {
370          LOG.warn("Exception get thrown in job commit, retry (" + attempt +
371              ") time.", e);
372        }
373      }
374    }
375  }
376
377  /**
378   * The job has completed, so do following commit job, include:
379   * Move all committed tasks to the final output dir (algorithm 1 only).
380   * Delete the temporary directory, including all of the work directories.
381   * Create a _SUCCESS file to make it as successful.
382   * @param context the job's context
383   */
384  @VisibleForTesting
385  protected void commitJobInternal(JobContext context) throws IOException {
386    if (hasOutputPath()) {
387      Path finalOutput = getOutputPath();
388      FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
389
390      if (algorithmVersion == 1) {
391        for (FileStatus stat: getAllCommittedTaskPaths(context)) {
392          mergePaths(fs, stat, finalOutput);
393        }
394      }
395
396      if (skipCleanup) {
397        LOG.info("Skip cleanup the _temporary folders under job's output " +
398            "directory in commitJob.");
399      } else {
400        // delete the _temporary folder and create a _done file in the o/p
401        // folder
402        try {
403          cleanupJob(context);
404        } catch (IOException e) {
405          if (ignoreCleanupFailures) {
406            // swallow exceptions in cleanup as user configure to make sure
407            // commitJob could be success even when cleanup get failure.
408            LOG.error("Error in cleanup job, manually cleanup is needed.", e);
409          } else {
410            // throw back exception to fail commitJob.
411            throw e;
412          }
413        }
414      }
415      // True if the job requires output.dir marked on successful job.
416      // Note that by default it is set to true.
417      if (context.getConfiguration().getBoolean(
418          SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
419        Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
420        // If job commit is repeatable and previous/another AM could write
421        // mark file already, we need to set overwritten to be true explicitly
422        // in case other FS implementations don't overwritten by default.
423        if (isCommitJobRepeatable(context)) {
424          fs.create(markerPath, true).close();
425        } else {
426          fs.create(markerPath).close();
427        }
428      }
429    } else {
430      LOG.warn("Output Path is null in commitJob()");
431    }
432  }
433
434  /**
435   * Merge two paths together.  Anything in from will be moved into to, if there
436   * are any name conflicts while merging the files or directories in from win.
437   * @param fs the File System to use
438   * @param from the path data is coming from.
439   * @param to the path data is going to.
440   * @throws IOException on any error
441   */
442  private void mergePaths(FileSystem fs, final FileStatus from,
443      final Path to) throws IOException {
444    if (LOG.isDebugEnabled()) {
445      LOG.debug("Merging data from " + from + " to " + to);
446    }
447    FileStatus toStat;
448    try {
449      toStat = fs.getFileStatus(to);
450    } catch (FileNotFoundException fnfe) {
451      toStat = null;
452    }
453
454    if (from.isFile()) {
455      if (toStat != null) {
456        if (!fs.delete(to, true)) {
457          throw new IOException("Failed to delete " + to);
458        }
459      }
460
461      if (!fs.rename(from.getPath(), to)) {
462        throw new IOException("Failed to rename " + from + " to " + to);
463      }
464    } else if (from.isDirectory()) {
465      if (toStat != null) {
466        if (!toStat.isDirectory()) {
467          if (!fs.delete(to, true)) {
468            throw new IOException("Failed to delete " + to);
469          }
470          renameOrMerge(fs, from, to);
471        } else {
472          //It is a directory so merge everything in the directories
473          for (FileStatus subFrom : fs.listStatus(from.getPath())) {
474            Path subTo = new Path(to, subFrom.getPath().getName());
475            mergePaths(fs, subFrom, subTo);
476          }
477        }
478      } else {
479        renameOrMerge(fs, from, to);
480      }
481    }
482  }
483
484  private void renameOrMerge(FileSystem fs, FileStatus from, Path to)
485      throws IOException {
486    if (algorithmVersion == 1) {
487      if (!fs.rename(from.getPath(), to)) {
488        throw new IOException("Failed to rename " + from + " to " + to);
489      }
490    } else {
491      fs.mkdirs(to);
492      for (FileStatus subFrom : fs.listStatus(from.getPath())) {
493        Path subTo = new Path(to, subFrom.getPath().getName());
494        mergePaths(fs, subFrom, subTo);
495      }
496    }
497  }
498
499  @Override
500  @Deprecated
501  public void cleanupJob(JobContext context) throws IOException {
502    if (hasOutputPath()) {
503      Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
504      FileSystem fs = pendingJobAttemptsPath
505          .getFileSystem(context.getConfiguration());
506      // if job allow repeatable commit and pendingJobAttemptsPath could be
507      // deleted by previous AM, we should tolerate FileNotFoundException in
508      // this case.
509      try {
510        fs.delete(pendingJobAttemptsPath, true);
511      } catch (FileNotFoundException e) {
512        if (!isCommitJobRepeatable(context)) {
513          throw e;
514        }
515      }
516    } else {
517      LOG.warn("Output Path is null in cleanupJob()");
518    }
519  }
520
521  /**
522   * Delete the temporary directory, including all of the work directories.
523   * @param context the job's context
524   */
525  @Override
526  public void abortJob(JobContext context, JobStatus.State state) 
527  throws IOException {
528    // delete the _temporary folder
529    cleanupJob(context);
530  }
531  
532  /**
533   * No task setup required.
534   */
535  @Override
536  public void setupTask(TaskAttemptContext context) throws IOException {
537    // FileOutputCommitter's setupTask doesn't do anything. Because the
538    // temporary task directory is created on demand when the 
539    // task is writing.
540  }
541
542  /**
543   * Move the files from the work directory to the job output directory
544   * @param context the task context
545   */
546  @Override
547  public void commitTask(TaskAttemptContext context) 
548  throws IOException {
549    commitTask(context, null);
550  }
551
552  @Private
553  public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
554      throws IOException {
555
556    TaskAttemptID attemptId = context.getTaskAttemptID();
557    if (hasOutputPath()) {
558      context.progress();
559      if(taskAttemptPath == null) {
560        taskAttemptPath = getTaskAttemptPath(context);
561      }
562      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
563      FileStatus taskAttemptDirStatus;
564      try {
565        taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
566      } catch (FileNotFoundException e) {
567        taskAttemptDirStatus = null;
568      }
569
570      if (taskAttemptDirStatus != null) {
571        if (algorithmVersion == 1) {
572          Path committedTaskPath = getCommittedTaskPath(context);
573          if (fs.exists(committedTaskPath)) {
574             if (!fs.delete(committedTaskPath, true)) {
575               throw new IOException("Could not delete " + committedTaskPath);
576             }
577          }
578          if (!fs.rename(taskAttemptPath, committedTaskPath)) {
579            throw new IOException("Could not rename " + taskAttemptPath + " to "
580                + committedTaskPath);
581          }
582          LOG.info("Saved output of task '" + attemptId + "' to " +
583              committedTaskPath);
584        } else {
585          // directly merge everything from taskAttemptPath to output directory
586          mergePaths(fs, taskAttemptDirStatus, outputPath);
587          LOG.info("Saved output of task '" + attemptId + "' to " +
588              outputPath);
589        }
590      } else {
591        LOG.warn("No Output found for " + attemptId);
592      }
593    } else {
594      LOG.warn("Output Path is null in commitTask()");
595    }
596  }
597
598  /**
599   * Delete the work directory
600   * @throws IOException 
601   */
602  @Override
603  public void abortTask(TaskAttemptContext context) throws IOException {
604    abortTask(context, null);
605  }
606
607  @Private
608  public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
609    if (hasOutputPath()) { 
610      context.progress();
611      if(taskAttemptPath == null) {
612        taskAttemptPath = getTaskAttemptPath(context);
613      }
614      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
615      if(!fs.delete(taskAttemptPath, true)) {
616        LOG.warn("Could not delete "+taskAttemptPath);
617      }
618    } else {
619      LOG.warn("Output Path is null in abortTask()");
620    }
621  }
622
623  /**
624   * Did this task write any files in the work directory?
625   * @param context the task's context
626   */
627  @Override
628  public boolean needsTaskCommit(TaskAttemptContext context
629                                 ) throws IOException {
630    return needsTaskCommit(context, null);
631  }
632
633  @Private
634  public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
635    ) throws IOException {
636    if(hasOutputPath()) {
637      if(taskAttemptPath == null) {
638        taskAttemptPath = getTaskAttemptPath(context);
639      }
640      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
641      return fs.exists(taskAttemptPath);
642    }
643    return false;
644  }
645
646  @Override
647  @Deprecated
648  public boolean isRecoverySupported() {
649    return true;
650  }
651
652  @Override
653  public boolean isCommitJobRepeatable(JobContext context) throws IOException {
654    return algorithmVersion == 2;
655  }
656
657  @Override
658  public void recoverTask(TaskAttemptContext context)
659      throws IOException {
660    if(hasOutputPath()) {
661      context.progress();
662      TaskAttemptID attemptId = context.getTaskAttemptID();
663      int previousAttempt = getAppAttemptId(context) - 1;
664      if (previousAttempt < 0) {
665        throw new IOException ("Cannot recover task output for first attempt...");
666      }
667
668      Path previousCommittedTaskPath = getCommittedTaskPath(
669          previousAttempt, context);
670      FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration());
671      if (LOG.isDebugEnabled()) {
672        LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
673      }
674      if (algorithmVersion == 1) {
675        if (fs.exists(previousCommittedTaskPath)) {
676          Path committedTaskPath = getCommittedTaskPath(context);
677          if (fs.exists(committedTaskPath)) {
678            if (!fs.delete(committedTaskPath, true)) {
679              throw new IOException("Could not delete "+committedTaskPath);
680            }
681          }
682          //Rename can fail if the parent directory does not yet exist.
683          Path committedParent = committedTaskPath.getParent();
684          fs.mkdirs(committedParent);
685          if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
686            throw new IOException("Could not rename " + previousCommittedTaskPath +
687                " to " + committedTaskPath);
688          }
689        } else {
690            LOG.warn(attemptId+" had no output to recover.");
691        }
692      } else {
693        // essentially a no-op, but for backwards compatibility
694        // after upgrade to the new fileOutputCommitter,
695        // check if there are any output left in committedTaskPath
696        if (fs.exists(previousCommittedTaskPath)) {
697          LOG.info("Recovering task for upgrading scenario, moving files from "
698              + previousCommittedTaskPath + " to " + outputPath);
699          FileStatus from = fs.getFileStatus(previousCommittedTaskPath);
700          mergePaths(fs, from, outputPath);
701        }
702        LOG.info("Done recovering task " + attemptId);
703      }
704    } else {
705      LOG.warn("Output Path is null in recoverTask()");
706    }
707  }
708}