001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.output;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceAudience.Private;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.fs.PathFilter;
034import org.apache.hadoop.mapreduce.JobContext;
035import org.apache.hadoop.mapreduce.JobStatus;
036import org.apache.hadoop.mapreduce.MRJobConfig;
037import org.apache.hadoop.mapreduce.OutputCommitter;
038import org.apache.hadoop.mapreduce.TaskAttemptContext;
039import org.apache.hadoop.mapreduce.TaskAttemptID;
040
041import com.google.common.annotations.VisibleForTesting;
042
043/** An {@link OutputCommitter} that commits files specified 
044 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
045 **/
046@InterfaceAudience.Public
047@InterfaceStability.Stable
048public class FileOutputCommitter extends OutputCommitter {
049  private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
050
051  /** 
052   * Name of directory where pending data is placed.  Data that has not been
053   * committed yet.
054   */
055  public static final String PENDING_DIR_NAME = "_temporary";
056  /**
057   * Temporary directory name 
058   *
059   * The static variable to be compatible with M/R 1.x
060   */
061  @Deprecated
062  protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME;
063  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
064  public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
065      "mapreduce.fileoutputcommitter.marksuccessfuljobs";
066  public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
067      "mapreduce.fileoutputcommitter.algorithm.version";
068  public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
069  // Number of attempts when failure happens in commit job
070  public static final String FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS =
071      "mapreduce.fileoutputcommitter.failures.attempts";
072
073  // default value to be 1 to keep consistent with previous behavior
074  public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1;
075  private Path outputPath = null;
076  private Path workPath = null;
077  private final int algorithmVersion;
078
079  /**
080   * Create a file output committer
081   * @param outputPath the job's output path, or null if you want the output
082   * committer to act as a noop.
083   * @param context the task's context
084   * @throws IOException
085   */
086  public FileOutputCommitter(Path outputPath, 
087                             TaskAttemptContext context) throws IOException {
088    this(outputPath, (JobContext)context);
089    if (outputPath != null) {
090      workPath = getTaskAttemptPath(context, outputPath);
091    }
092  }
093  
094  /**
095   * Create a file output committer
096   * @param outputPath the job's output path, or null if you want the output
097   * committer to act as a noop.
098   * @param context the task's context
099   * @throws IOException
100   */
101  @Private
102  public FileOutputCommitter(Path outputPath, 
103                             JobContext context) throws IOException {
104    Configuration conf = context.getConfiguration();
105    algorithmVersion =
106        conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
107                    FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT);
108    LOG.info("File Output Committer Algorithm version is " + algorithmVersion);
109    if (algorithmVersion != 1 && algorithmVersion != 2) {
110      throw new IOException("Only 1 or 2 algorithm version is supported");
111    }
112    if (outputPath != null) {
113      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
114      this.outputPath = fs.makeQualified(outputPath);
115    }
116  }
117  
118  /**
119   * @return the path where final output of the job should be placed.  This
120   * could also be considered the committed application attempt path.
121   */
122  private Path getOutputPath() {
123    return this.outputPath;
124  }
125  
126  /**
127   * @return true if we have an output path set, else false.
128   */
129  private boolean hasOutputPath() {
130    return this.outputPath != null;
131  }
132  
133  /**
134   * @return the path where the output of pending job attempts are
135   * stored.
136   */
137  private Path getPendingJobAttemptsPath() {
138    return getPendingJobAttemptsPath(getOutputPath());
139  }
140  
141  /**
142   * Get the location of pending job attempts.
143   * @param out the base output directory.
144   * @return the location of pending job attempts.
145   */
146  private static Path getPendingJobAttemptsPath(Path out) {
147    return new Path(out, PENDING_DIR_NAME);
148  }
149  
150  /**
151   * Get the Application Attempt Id for this job
152   * @param context the context to look in
153   * @return the Application Attempt Id for a given job.
154   */
155  private static int getAppAttemptId(JobContext context) {
156    return context.getConfiguration().getInt(
157        MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
158  }
159  
160  /**
161   * Compute the path where the output of a given job attempt will be placed. 
162   * @param context the context of the job.  This is used to get the
163   * application attempt id.
164   * @return the path to store job attempt data.
165   */
166  public Path getJobAttemptPath(JobContext context) {
167    return getJobAttemptPath(context, getOutputPath());
168  }
169  
170  /**
171   * Compute the path where the output of a given job attempt will be placed. 
172   * @param context the context of the job.  This is used to get the
173   * application attempt id.
174   * @param out the output path to place these in.
175   * @return the path to store job attempt data.
176   */
177  public static Path getJobAttemptPath(JobContext context, Path out) {
178    return getJobAttemptPath(getAppAttemptId(context), out);
179  }
180  
181  /**
182   * Compute the path where the output of a given job attempt will be placed. 
183   * @param appAttemptId the ID of the application attempt for this job.
184   * @return the path to store job attempt data.
185   */
186  protected Path getJobAttemptPath(int appAttemptId) {
187    return getJobAttemptPath(appAttemptId, getOutputPath());
188  }
189  
190  /**
191   * Compute the path where the output of a given job attempt will be placed. 
192   * @param appAttemptId the ID of the application attempt for this job.
193   * @return the path to store job attempt data.
194   */
195  private static Path getJobAttemptPath(int appAttemptId, Path out) {
196    return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
197  }
198  
199  /**
200   * Compute the path where the output of pending task attempts are stored.
201   * @param context the context of the job with pending tasks. 
202   * @return the path where the output of pending task attempts are stored.
203   */
204  private Path getPendingTaskAttemptsPath(JobContext context) {
205    return getPendingTaskAttemptsPath(context, getOutputPath());
206  }
207  
208  /**
209   * Compute the path where the output of pending task attempts are stored.
210   * @param context the context of the job with pending tasks. 
211   * @return the path where the output of pending task attempts are stored.
212   */
213  private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
214    return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
215  }
216  
217  /**
218   * Compute the path where the output of a task attempt is stored until
219   * that task is committed.
220   * 
221   * @param context the context of the task attempt.
222   * @return the path where a task attempt should be stored.
223   */
224  public Path getTaskAttemptPath(TaskAttemptContext context) {
225    return new Path(getPendingTaskAttemptsPath(context), 
226        String.valueOf(context.getTaskAttemptID()));
227  }
228  
229  /**
230   * Compute the path where the output of a task attempt is stored until
231   * that task is committed.
232   * 
233   * @param context the context of the task attempt.
234   * @param out The output path to put things in.
235   * @return the path where a task attempt should be stored.
236   */
237  public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
238    return new Path(getPendingTaskAttemptsPath(context, out), 
239        String.valueOf(context.getTaskAttemptID()));
240  }
241  
242  /**
243   * Compute the path where the output of a committed task is stored until
244   * the entire job is committed.
245   * @param context the context of the task attempt
246   * @return the path where the output of a committed task is stored until
247   * the entire job is committed.
248   */
249  public Path getCommittedTaskPath(TaskAttemptContext context) {
250    return getCommittedTaskPath(getAppAttemptId(context), context);
251  }
252  
253  public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
254    return getCommittedTaskPath(getAppAttemptId(context), context, out);
255  }
256  
257  /**
258   * Compute the path where the output of a committed task is stored until the
259   * entire job is committed for a specific application attempt.
260   * @param appAttemptId the id of the application attempt to use
261   * @param context the context of any task.
262   * @return the path where the output of a committed task is stored.
263   */
264  protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
265    return new Path(getJobAttemptPath(appAttemptId),
266        String.valueOf(context.getTaskAttemptID().getTaskID()));
267  }
268  
269  private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
270    return new Path(getJobAttemptPath(appAttemptId, out),
271        String.valueOf(context.getTaskAttemptID().getTaskID()));
272  }
273
274  private static class CommittedTaskFilter implements PathFilter {
275    @Override
276    public boolean accept(Path path) {
277      return !PENDING_DIR_NAME.equals(path.getName());
278    }
279  }
280
281  /**
282   * Get a list of all paths where output from committed tasks are stored.
283   * @param context the context of the current job
284   * @return the list of these Paths/FileStatuses. 
285   * @throws IOException
286   */
287  private FileStatus[] getAllCommittedTaskPaths(JobContext context) 
288    throws IOException {
289    Path jobAttemptPath = getJobAttemptPath(context);
290    FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
291    return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
292  }
293
294  /**
295   * Get the directory that the task should write results into.
296   * @return the work directory
297   * @throws IOException
298   */
299  public Path getWorkPath() throws IOException {
300    return workPath;
301  }
302
303  /**
304   * Create the temporary directory that is the root of all of the task 
305   * work directories.
306   * @param context the job's context
307   */
308  public void setupJob(JobContext context) throws IOException {
309    if (hasOutputPath()) {
310      Path jobAttemptPath = getJobAttemptPath(context);
311      FileSystem fs = jobAttemptPath.getFileSystem(
312          context.getConfiguration());
313      if (!fs.mkdirs(jobAttemptPath)) {
314        LOG.error("Mkdirs failed to create " + jobAttemptPath);
315      }
316    } else {
317      LOG.warn("Output Path is null in setupJob()");
318    }
319  }
320
321  /**
322   * The job has completed, so do works in commitJobInternal().
323   * Could retry on failure if using algorithm 2.
324   * @param context the job's context
325   */
326  public void commitJob(JobContext context) throws IOException {
327    int maxAttemptsOnFailure = isCommitJobRepeatable(context) ?
328        context.getConfiguration().getInt(FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS,
329            FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT) : 1;
330    int attempt = 0;
331    boolean jobCommitNotFinished = true;
332    while (jobCommitNotFinished) {
333      try {
334        commitJobInternal(context);
335        jobCommitNotFinished = false;
336      } catch (Exception e) {
337        if (++attempt >= maxAttemptsOnFailure) {
338          throw e;
339        } else {
340          LOG.warn("Exception get thrown in job commit, retry (" + attempt +
341              ") time.", e);
342        }
343      }
344    }
345  }
346
347  /**
348   * The job has completed, so do following commit job, include:
349   * Move all committed tasks to the final output dir (algorithm 1 only).
350   * Delete the temporary directory, including all of the work directories.
351   * Create a _SUCCESS file to make it as successful.
352   * @param context the job's context
353   */
354  @VisibleForTesting
355  protected void commitJobInternal(JobContext context) throws IOException {
356    if (hasOutputPath()) {
357      Path finalOutput = getOutputPath();
358      FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
359
360      if (algorithmVersion == 1) {
361        for (FileStatus stat: getAllCommittedTaskPaths(context)) {
362          mergePaths(fs, stat, finalOutput);
363        }
364      }
365
366      // delete the _temporary folder and create a _done file in the o/p folder
367      cleanupJob(context);
368      // True if the job requires output.dir marked on successful job.
369      // Note that by default it is set to true.
370      if (context.getConfiguration().getBoolean(
371          SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
372        Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
373        // If job commit is repeatable and previous/another AM could write
374        // mark file already, we need to set overwritten to be true explicitly
375        // in case other FS implementations don't overwritten by default.
376        if (isCommitJobRepeatable(context)) {
377          fs.create(markerPath, true).close();
378        } else {
379          fs.create(markerPath).close();
380        }
381      }
382    } else {
383      LOG.warn("Output Path is null in commitJob()");
384    }
385  }
386
387  /**
388   * Merge two paths together.  Anything in from will be moved into to, if there
389   * are any name conflicts while merging the files or directories in from win.
390   * @param fs the File System to use
391   * @param from the path data is coming from.
392   * @param to the path data is going to.
393   * @throws IOException on any error
394   */
395  private void mergePaths(FileSystem fs, final FileStatus from,
396      final Path to) throws IOException {
397    if (LOG.isDebugEnabled()) {
398      LOG.debug("Merging data from " + from + " to " + to);
399    }
400    FileStatus toStat;
401    try {
402      toStat = fs.getFileStatus(to);
403    } catch (FileNotFoundException fnfe) {
404      toStat = null;
405    }
406
407    if (from.isFile()) {
408      if (toStat != null) {
409        if (!fs.delete(to, true)) {
410          throw new IOException("Failed to delete " + to);
411        }
412      }
413
414      if (!fs.rename(from.getPath(), to)) {
415        throw new IOException("Failed to rename " + from + " to " + to);
416      }
417    } else if (from.isDirectory()) {
418      if (toStat != null) {
419        if (!toStat.isDirectory()) {
420          if (!fs.delete(to, true)) {
421            throw new IOException("Failed to delete " + to);
422          }
423          renameOrMerge(fs, from, to);
424        } else {
425          //It is a directory so merge everything in the directories
426          for (FileStatus subFrom : fs.listStatus(from.getPath())) {
427            Path subTo = new Path(to, subFrom.getPath().getName());
428            mergePaths(fs, subFrom, subTo);
429          }
430        }
431      } else {
432        renameOrMerge(fs, from, to);
433      }
434    }
435  }
436
437  private void renameOrMerge(FileSystem fs, FileStatus from, Path to)
438      throws IOException {
439    if (algorithmVersion == 1) {
440      if (!fs.rename(from.getPath(), to)) {
441        throw new IOException("Failed to rename " + from + " to " + to);
442      }
443    } else {
444      fs.mkdirs(to);
445      for (FileStatus subFrom : fs.listStatus(from.getPath())) {
446        Path subTo = new Path(to, subFrom.getPath().getName());
447        mergePaths(fs, subFrom, subTo);
448      }
449    }
450  }
451
452  @Override
453  @Deprecated
454  public void cleanupJob(JobContext context) throws IOException {
455    if (hasOutputPath()) {
456      Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
457      FileSystem fs = pendingJobAttemptsPath
458          .getFileSystem(context.getConfiguration());
459      // if job allow repeatable commit and pendingJobAttemptsPath could be
460      // deleted by previous AM, we should tolerate FileNotFoundException in
461      // this case.
462      try {
463        fs.delete(pendingJobAttemptsPath, true);
464      } catch (FileNotFoundException e) {
465        if (!isCommitJobRepeatable(context)) {
466          throw e;
467        }
468      }
469    } else {
470      LOG.warn("Output Path is null in cleanupJob()");
471    }
472  }
473
474  /**
475   * Delete the temporary directory, including all of the work directories.
476   * @param context the job's context
477   */
478  @Override
479  public void abortJob(JobContext context, JobStatus.State state) 
480  throws IOException {
481    // delete the _temporary folder
482    cleanupJob(context);
483  }
484  
485  /**
486   * No task setup required.
487   */
488  @Override
489  public void setupTask(TaskAttemptContext context) throws IOException {
490    // FileOutputCommitter's setupTask doesn't do anything. Because the
491    // temporary task directory is created on demand when the 
492    // task is writing.
493  }
494
495  /**
496   * Move the files from the work directory to the job output directory
497   * @param context the task context
498   */
499  @Override
500  public void commitTask(TaskAttemptContext context) 
501  throws IOException {
502    commitTask(context, null);
503  }
504
505  @Private
506  public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
507      throws IOException {
508
509    TaskAttemptID attemptId = context.getTaskAttemptID();
510    if (hasOutputPath()) {
511      context.progress();
512      if(taskAttemptPath == null) {
513        taskAttemptPath = getTaskAttemptPath(context);
514      }
515      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
516      FileStatus taskAttemptDirStatus;
517      try {
518        taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
519      } catch (FileNotFoundException e) {
520        taskAttemptDirStatus = null;
521      }
522
523      if (taskAttemptDirStatus != null) {
524        if (algorithmVersion == 1) {
525          Path committedTaskPath = getCommittedTaskPath(context);
526          if (fs.exists(committedTaskPath)) {
527             if (!fs.delete(committedTaskPath, true)) {
528               throw new IOException("Could not delete " + committedTaskPath);
529             }
530          }
531          if (!fs.rename(taskAttemptPath, committedTaskPath)) {
532            throw new IOException("Could not rename " + taskAttemptPath + " to "
533                + committedTaskPath);
534          }
535          LOG.info("Saved output of task '" + attemptId + "' to " +
536              committedTaskPath);
537        } else {
538          // directly merge everything from taskAttemptPath to output directory
539          mergePaths(fs, taskAttemptDirStatus, outputPath);
540          LOG.info("Saved output of task '" + attemptId + "' to " +
541              outputPath);
542        }
543      } else {
544        LOG.warn("No Output found for " + attemptId);
545      }
546    } else {
547      LOG.warn("Output Path is null in commitTask()");
548    }
549  }
550
551  /**
552   * Delete the work directory
553   * @throws IOException 
554   */
555  @Override
556  public void abortTask(TaskAttemptContext context) throws IOException {
557    abortTask(context, null);
558  }
559
560  @Private
561  public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
562    if (hasOutputPath()) { 
563      context.progress();
564      if(taskAttemptPath == null) {
565        taskAttemptPath = getTaskAttemptPath(context);
566      }
567      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
568      if(!fs.delete(taskAttemptPath, true)) {
569        LOG.warn("Could not delete "+taskAttemptPath);
570      }
571    } else {
572      LOG.warn("Output Path is null in abortTask()");
573    }
574  }
575
576  /**
577   * Did this task write any files in the work directory?
578   * @param context the task's context
579   */
580  @Override
581  public boolean needsTaskCommit(TaskAttemptContext context
582                                 ) throws IOException {
583    return needsTaskCommit(context, null);
584  }
585
586  @Private
587  public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
588    ) throws IOException {
589    if(hasOutputPath()) {
590      if(taskAttemptPath == null) {
591        taskAttemptPath = getTaskAttemptPath(context);
592      }
593      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
594      return fs.exists(taskAttemptPath);
595    }
596    return false;
597  }
598
599  @Override
600  @Deprecated
601  public boolean isRecoverySupported() {
602    return true;
603  }
604
605  @Override
606  public boolean isCommitJobRepeatable(JobContext context) throws IOException {
607    return algorithmVersion == 2;
608  }
609
610  @Override
611  public void recoverTask(TaskAttemptContext context)
612      throws IOException {
613    if(hasOutputPath()) {
614      context.progress();
615      TaskAttemptID attemptId = context.getTaskAttemptID();
616      int previousAttempt = getAppAttemptId(context) - 1;
617      if (previousAttempt < 0) {
618        throw new IOException ("Cannot recover task output for first attempt...");
619      }
620
621      Path previousCommittedTaskPath = getCommittedTaskPath(
622          previousAttempt, context);
623      FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration());
624      if (LOG.isDebugEnabled()) {
625        LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
626      }
627      if (algorithmVersion == 1) {
628        if (fs.exists(previousCommittedTaskPath)) {
629          Path committedTaskPath = getCommittedTaskPath(context);
630          if (fs.exists(committedTaskPath)) {
631            if (!fs.delete(committedTaskPath, true)) {
632              throw new IOException("Could not delete "+committedTaskPath);
633            }
634          }
635          //Rename can fail if the parent directory does not yet exist.
636          Path committedParent = committedTaskPath.getParent();
637          fs.mkdirs(committedParent);
638          if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
639            throw new IOException("Could not rename " + previousCommittedTaskPath +
640                " to " + committedTaskPath);
641          }
642        } else {
643            LOG.warn(attemptId+" had no output to recover.");
644        }
645      } else {
646        // essentially a no-op, but for backwards compatibility
647        // after upgrade to the new fileOutputCommitter,
648        // check if there are any output left in committedTaskPath
649        if (fs.exists(previousCommittedTaskPath)) {
650          LOG.info("Recovering task for upgrading scenario, moving files from "
651              + previousCommittedTaskPath + " to " + outputPath);
652          FileStatus from = fs.getFileStatus(previousCommittedTaskPath);
653          mergePaths(fs, from, outputPath);
654        }
655        LOG.info("Done recovering task " + attemptId);
656      }
657    } else {
658      LOG.warn("Output Path is null in recoverTask()");
659    }
660  }
661}