001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.output;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceAudience.Private;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.fs.PathFilter;
034import org.apache.hadoop.mapreduce.JobContext;
035import org.apache.hadoop.mapreduce.JobStatus;
036import org.apache.hadoop.mapreduce.MRJobConfig;
037import org.apache.hadoop.mapreduce.OutputCommitter;
038import org.apache.hadoop.mapreduce.TaskAttemptContext;
039import org.apache.hadoop.mapreduce.TaskAttemptID;
040
041/** An {@link OutputCommitter} that commits files specified 
042 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
043 **/
044@InterfaceAudience.Public
045@InterfaceStability.Stable
046public class FileOutputCommitter extends OutputCommitter {
047  private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
048
049  /** 
050   * Name of directory where pending data is placed.  Data that has not been
051   * committed yet.
052   */
053  public static final String PENDING_DIR_NAME = "_temporary";
054  /**
055   * Temporary directory name 
056   *
057   * The static variable to be compatible with M/R 1.x
058   */
059  @Deprecated
060  protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME;
061  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
062  public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
063      "mapreduce.fileoutputcommitter.marksuccessfuljobs";
064  public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
065      "mapreduce.fileoutputcommitter.algorithm.version";
066  public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
067  private Path outputPath = null;
068  private Path workPath = null;
069  private final int algorithmVersion;
070
071  /**
072   * Create a file output committer
073   * @param outputPath the job's output path, or null if you want the output
074   * committer to act as a noop.
075   * @param context the task's context
076   * @throws IOException
077   */
078  public FileOutputCommitter(Path outputPath, 
079                             TaskAttemptContext context) throws IOException {
080    this(outputPath, (JobContext)context);
081    if (outputPath != null) {
082      workPath = getTaskAttemptPath(context, outputPath);
083    }
084  }
085  
086  /**
087   * Create a file output committer
088   * @param outputPath the job's output path, or null if you want the output
089   * committer to act as a noop.
090   * @param context the task's context
091   * @throws IOException
092   */
093  @Private
094  public FileOutputCommitter(Path outputPath, 
095                             JobContext context) throws IOException {
096    Configuration conf = context.getConfiguration();
097    algorithmVersion =
098        conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
099                    FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT);
100    LOG.info("File Output Committer Algorithm version is " + algorithmVersion);
101    if (algorithmVersion != 1 && algorithmVersion != 2) {
102      throw new IOException("Only 1 or 2 algorithm version is supported");
103    }
104    if (outputPath != null) {
105      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
106      this.outputPath = fs.makeQualified(outputPath);
107    }
108  }
109  
110  /**
111   * @return the path where final output of the job should be placed.  This
112   * could also be considered the committed application attempt path.
113   */
114  private Path getOutputPath() {
115    return this.outputPath;
116  }
117  
118  /**
119   * @return true if we have an output path set, else false.
120   */
121  private boolean hasOutputPath() {
122    return this.outputPath != null;
123  }
124  
125  /**
126   * @return the path where the output of pending job attempts are
127   * stored.
128   */
129  private Path getPendingJobAttemptsPath() {
130    return getPendingJobAttemptsPath(getOutputPath());
131  }
132  
133  /**
134   * Get the location of pending job attempts.
135   * @param out the base output directory.
136   * @return the location of pending job attempts.
137   */
138  private static Path getPendingJobAttemptsPath(Path out) {
139    return new Path(out, PENDING_DIR_NAME);
140  }
141  
142  /**
143   * Get the Application Attempt Id for this job
144   * @param context the context to look in
145   * @return the Application Attempt Id for a given job.
146   */
147  private static int getAppAttemptId(JobContext context) {
148    return context.getConfiguration().getInt(
149        MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
150  }
151  
152  /**
153   * Compute the path where the output of a given job attempt will be placed. 
154   * @param context the context of the job.  This is used to get the
155   * application attempt id.
156   * @return the path to store job attempt data.
157   */
158  public Path getJobAttemptPath(JobContext context) {
159    return getJobAttemptPath(context, getOutputPath());
160  }
161  
162  /**
163   * Compute the path where the output of a given job attempt will be placed. 
164   * @param context the context of the job.  This is used to get the
165   * application attempt id.
166   * @param out the output path to place these in.
167   * @return the path to store job attempt data.
168   */
169  public static Path getJobAttemptPath(JobContext context, Path out) {
170    return getJobAttemptPath(getAppAttemptId(context), out);
171  }
172  
173  /**
174   * Compute the path where the output of a given job attempt will be placed. 
175   * @param appAttemptId the ID of the application attempt for this job.
176   * @return the path to store job attempt data.
177   */
178  protected Path getJobAttemptPath(int appAttemptId) {
179    return getJobAttemptPath(appAttemptId, getOutputPath());
180  }
181  
182  /**
183   * Compute the path where the output of a given job attempt will be placed. 
184   * @param appAttemptId the ID of the application attempt for this job.
185   * @return the path to store job attempt data.
186   */
187  private static Path getJobAttemptPath(int appAttemptId, Path out) {
188    return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
189  }
190  
191  /**
192   * Compute the path where the output of pending task attempts are stored.
193   * @param context the context of the job with pending tasks. 
194   * @return the path where the output of pending task attempts are stored.
195   */
196  private Path getPendingTaskAttemptsPath(JobContext context) {
197    return getPendingTaskAttemptsPath(context, getOutputPath());
198  }
199  
200  /**
201   * Compute the path where the output of pending task attempts are stored.
202   * @param context the context of the job with pending tasks. 
203   * @return the path where the output of pending task attempts are stored.
204   */
205  private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
206    return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
207  }
208  
209  /**
210   * Compute the path where the output of a task attempt is stored until
211   * that task is committed.
212   * 
213   * @param context the context of the task attempt.
214   * @return the path where a task attempt should be stored.
215   */
216  public Path getTaskAttemptPath(TaskAttemptContext context) {
217    return new Path(getPendingTaskAttemptsPath(context), 
218        String.valueOf(context.getTaskAttemptID()));
219  }
220  
221  /**
222   * Compute the path where the output of a task attempt is stored until
223   * that task is committed.
224   * 
225   * @param context the context of the task attempt.
226   * @param out The output path to put things in.
227   * @return the path where a task attempt should be stored.
228   */
229  public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
230    return new Path(getPendingTaskAttemptsPath(context, out), 
231        String.valueOf(context.getTaskAttemptID()));
232  }
233  
234  /**
235   * Compute the path where the output of a committed task is stored until
236   * the entire job is committed.
237   * @param context the context of the task attempt
238   * @return the path where the output of a committed task is stored until
239   * the entire job is committed.
240   */
241  public Path getCommittedTaskPath(TaskAttemptContext context) {
242    return getCommittedTaskPath(getAppAttemptId(context), context);
243  }
244  
245  public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
246    return getCommittedTaskPath(getAppAttemptId(context), context, out);
247  }
248  
249  /**
250   * Compute the path where the output of a committed task is stored until the
251   * entire job is committed for a specific application attempt.
252   * @param appAttemptId the id of the application attempt to use
253   * @param context the context of any task.
254   * @return the path where the output of a committed task is stored.
255   */
256  protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
257    return new Path(getJobAttemptPath(appAttemptId),
258        String.valueOf(context.getTaskAttemptID().getTaskID()));
259  }
260  
261  private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
262    return new Path(getJobAttemptPath(appAttemptId, out),
263        String.valueOf(context.getTaskAttemptID().getTaskID()));
264  }
265
266  private static class CommittedTaskFilter implements PathFilter {
267    @Override
268    public boolean accept(Path path) {
269      return !PENDING_DIR_NAME.equals(path.getName());
270    }
271  }
272
273  /**
274   * Get a list of all paths where output from committed tasks are stored.
275   * @param context the context of the current job
276   * @return the list of these Paths/FileStatuses. 
277   * @throws IOException
278   */
279  private FileStatus[] getAllCommittedTaskPaths(JobContext context) 
280    throws IOException {
281    Path jobAttemptPath = getJobAttemptPath(context);
282    FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
283    return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
284  }
285
286  /**
287   * Get the directory that the task should write results into.
288   * @return the work directory
289   * @throws IOException
290   */
291  public Path getWorkPath() throws IOException {
292    return workPath;
293  }
294
295  /**
296   * Create the temporary directory that is the root of all of the task 
297   * work directories.
298   * @param context the job's context
299   */
300  public void setupJob(JobContext context) throws IOException {
301    if (hasOutputPath()) {
302      Path jobAttemptPath = getJobAttemptPath(context);
303      FileSystem fs = jobAttemptPath.getFileSystem(
304          context.getConfiguration());
305      if (!fs.mkdirs(jobAttemptPath)) {
306        LOG.error("Mkdirs failed to create " + jobAttemptPath);
307      }
308    } else {
309      LOG.warn("Output Path is null in setupJob()");
310    }
311  }
312
313  /**
314   * The job has completed so move all committed tasks to the final output dir.
315   * Delete the temporary directory, including all of the work directories.
316   * Create a _SUCCESS file to make it as successful.
317   * @param context the job's context
318   */
319  public void commitJob(JobContext context) throws IOException {
320    if (hasOutputPath()) {
321      Path finalOutput = getOutputPath();
322      FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
323
324      if (algorithmVersion == 1) {
325        for (FileStatus stat: getAllCommittedTaskPaths(context)) {
326          mergePaths(fs, stat, finalOutput);
327        }
328      }
329
330      // delete the _temporary folder and create a _done file in the o/p folder
331      cleanupJob(context);
332      // True if the job requires output.dir marked on successful job.
333      // Note that by default it is set to true.
334      if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
335        Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
336        fs.create(markerPath).close();
337      }
338    } else {
339      LOG.warn("Output Path is null in commitJob()");
340    }
341  }
342
343  /**
344   * Merge two paths together.  Anything in from will be moved into to, if there
345   * are any name conflicts while merging the files or directories in from win.
346   * @param fs the File System to use
347   * @param from the path data is coming from.
348   * @param to the path data is going to.
349   * @throws IOException on any error
350   */
351  private void mergePaths(FileSystem fs, final FileStatus from,
352      final Path to) throws IOException {
353    if (LOG.isDebugEnabled()) {
354      LOG.debug("Merging data from " + from + " to " + to);
355    }
356    FileStatus toStat;
357    try {
358      toStat = fs.getFileStatus(to);
359    } catch (FileNotFoundException fnfe) {
360      toStat = null;
361    }
362
363    if (from.isFile()) {
364      if (toStat != null) {
365        if (!fs.delete(to, true)) {
366          throw new IOException("Failed to delete " + to);
367        }
368      }
369
370      if (!fs.rename(from.getPath(), to)) {
371        throw new IOException("Failed to rename " + from + " to " + to);
372      }
373    } else if (from.isDirectory()) {
374      if (toStat != null) {
375        if (!toStat.isDirectory()) {
376          if (!fs.delete(to, true)) {
377            throw new IOException("Failed to delete " + to);
378          }
379          renameOrMerge(fs, from, to);
380        } else {
381          //It is a directory so merge everything in the directories
382          for (FileStatus subFrom : fs.listStatus(from.getPath())) {
383            Path subTo = new Path(to, subFrom.getPath().getName());
384            mergePaths(fs, subFrom, subTo);
385          }
386        }
387      } else {
388        renameOrMerge(fs, from, to);
389      }
390    }
391  }
392
393  private void renameOrMerge(FileSystem fs, FileStatus from, Path to)
394      throws IOException {
395    if (algorithmVersion == 1) {
396      if (!fs.rename(from.getPath(), to)) {
397        throw new IOException("Failed to rename " + from + " to " + to);
398      }
399    } else {
400      fs.mkdirs(to);
401      for (FileStatus subFrom : fs.listStatus(from.getPath())) {
402        Path subTo = new Path(to, subFrom.getPath().getName());
403        mergePaths(fs, subFrom, subTo);
404      }
405    }
406  }
407
408  @Override
409  @Deprecated
410  public void cleanupJob(JobContext context) throws IOException {
411    if (hasOutputPath()) {
412      Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
413      FileSystem fs = pendingJobAttemptsPath
414          .getFileSystem(context.getConfiguration());
415      fs.delete(pendingJobAttemptsPath, true);
416    } else {
417      LOG.warn("Output Path is null in cleanupJob()");
418    }
419  }
420
421  /**
422   * Delete the temporary directory, including all of the work directories.
423   * @param context the job's context
424   */
425  @Override
426  public void abortJob(JobContext context, JobStatus.State state) 
427  throws IOException {
428    // delete the _temporary folder
429    cleanupJob(context);
430  }
431  
432  /**
433   * No task setup required.
434   */
435  @Override
436  public void setupTask(TaskAttemptContext context) throws IOException {
437    // FileOutputCommitter's setupTask doesn't do anything. Because the
438    // temporary task directory is created on demand when the 
439    // task is writing.
440  }
441
442  /**
443   * Move the files from the work directory to the job output directory
444   * @param context the task context
445   */
446  @Override
447  public void commitTask(TaskAttemptContext context) 
448  throws IOException {
449    commitTask(context, null);
450  }
451
452  @Private
453  public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
454      throws IOException {
455
456    TaskAttemptID attemptId = context.getTaskAttemptID();
457    if (hasOutputPath()) {
458      context.progress();
459      if(taskAttemptPath == null) {
460        taskAttemptPath = getTaskAttemptPath(context);
461      }
462      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
463      FileStatus taskAttemptDirStatus;
464      try {
465        taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
466      } catch (FileNotFoundException e) {
467        taskAttemptDirStatus = null;
468      }
469
470      if (taskAttemptDirStatus != null) {
471        if (algorithmVersion == 1) {
472          Path committedTaskPath = getCommittedTaskPath(context);
473          if (fs.exists(committedTaskPath)) {
474             if (!fs.delete(committedTaskPath, true)) {
475               throw new IOException("Could not delete " + committedTaskPath);
476             }
477          }
478          if (!fs.rename(taskAttemptPath, committedTaskPath)) {
479            throw new IOException("Could not rename " + taskAttemptPath + " to "
480                + committedTaskPath);
481          }
482          LOG.info("Saved output of task '" + attemptId + "' to " +
483              committedTaskPath);
484        } else {
485          // directly merge everything from taskAttemptPath to output directory
486          mergePaths(fs, taskAttemptDirStatus, outputPath);
487          LOG.info("Saved output of task '" + attemptId + "' to " +
488              outputPath);
489        }
490      } else {
491        LOG.warn("No Output found for " + attemptId);
492      }
493    } else {
494      LOG.warn("Output Path is null in commitTask()");
495    }
496  }
497
498  /**
499   * Delete the work directory
500   * @throws IOException 
501   */
502  @Override
503  public void abortTask(TaskAttemptContext context) throws IOException {
504    abortTask(context, null);
505  }
506
507  @Private
508  public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
509    if (hasOutputPath()) { 
510      context.progress();
511      if(taskAttemptPath == null) {
512        taskAttemptPath = getTaskAttemptPath(context);
513      }
514      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
515      if(!fs.delete(taskAttemptPath, true)) {
516        LOG.warn("Could not delete "+taskAttemptPath);
517      }
518    } else {
519      LOG.warn("Output Path is null in abortTask()");
520    }
521  }
522
523  /**
524   * Did this task write any files in the work directory?
525   * @param context the task's context
526   */
527  @Override
528  public boolean needsTaskCommit(TaskAttemptContext context
529                                 ) throws IOException {
530    return needsTaskCommit(context, null);
531  }
532
533  @Private
534  public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
535    ) throws IOException {
536    if(hasOutputPath()) {
537      if(taskAttemptPath == null) {
538        taskAttemptPath = getTaskAttemptPath(context);
539      }
540      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
541      return fs.exists(taskAttemptPath);
542    }
543    return false;
544  }
545
546  @Override
547  @Deprecated
548  public boolean isRecoverySupported() {
549    return true;
550  }
551  
552  @Override
553  public void recoverTask(TaskAttemptContext context)
554      throws IOException {
555    if(hasOutputPath()) {
556      context.progress();
557      TaskAttemptID attemptId = context.getTaskAttemptID();
558      int previousAttempt = getAppAttemptId(context) - 1;
559      if (previousAttempt < 0) {
560        throw new IOException ("Cannot recover task output for first attempt...");
561      }
562
563      Path previousCommittedTaskPath = getCommittedTaskPath(
564          previousAttempt, context);
565      FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration());
566      if (LOG.isDebugEnabled()) {
567        LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
568      }
569      if (algorithmVersion == 1) {
570        if (fs.exists(previousCommittedTaskPath)) {
571          Path committedTaskPath = getCommittedTaskPath(context);
572          if (fs.exists(committedTaskPath)) {
573            if (!fs.delete(committedTaskPath, true)) {
574              throw new IOException("Could not delete "+committedTaskPath);
575            }
576          }
577          //Rename can fail if the parent directory does not yet exist.
578          Path committedParent = committedTaskPath.getParent();
579          fs.mkdirs(committedParent);
580          if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
581            throw new IOException("Could not rename " + previousCommittedTaskPath +
582                " to " + committedTaskPath);
583          }
584        } else {
585            LOG.warn(attemptId+" had no output to recover.");
586        }
587      } else {
588        // essentially a no-op, but for backwards compatibility
589        // after upgrade to the new fileOutputCommitter,
590        // check if there are any output left in committedTaskPath
591        if (fs.exists(previousCommittedTaskPath)) {
592          LOG.info("Recovering task for upgrading scenario, moving files from "
593              + previousCommittedTaskPath + " to " + outputPath);
594          FileStatus from = fs.getFileStatus(previousCommittedTaskPath);
595          mergePaths(fs, from, outputPath);
596        }
597        LOG.info("Done recovering task " + attemptId);
598      }
599    } else {
600      LOG.warn("Output Path is null in recoverTask()");
601    }
602  }
603}