001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.output;
020
021import java.io.IOException;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceAudience.Private;
027import org.apache.hadoop.classification.InterfaceStability;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.fs.PathFilter;
032import org.apache.hadoop.mapreduce.JobContext;
033import org.apache.hadoop.mapreduce.JobStatus;
034import org.apache.hadoop.mapreduce.MRJobConfig;
035import org.apache.hadoop.mapreduce.OutputCommitter;
036import org.apache.hadoop.mapreduce.TaskAttemptContext;
037import org.apache.hadoop.mapreduce.TaskAttemptID;
038
039/** An {@link OutputCommitter} that commits files specified 
040 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
041 **/
042@InterfaceAudience.Public
043@InterfaceStability.Stable
044public class FileOutputCommitter extends OutputCommitter {
045  private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
046
047  /** 
048   * Name of directory where pending data is placed.  Data that has not been
049   * committed yet.
050   */
051  public static final String PENDING_DIR_NAME = "_temporary";
052  /**
053   * Temporary directory name 
054   *
055   * The static variable to be compatible with M/R 1.x
056   */
057  @Deprecated
058  protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME;
059  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
060  public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 
061    "mapreduce.fileoutputcommitter.marksuccessfuljobs";
062  private Path outputPath = null;
063  private Path workPath = null;
064
065  /**
066   * Create a file output committer
067   * @param outputPath the job's output path, or null if you want the output
068   * committer to act as a noop.
069   * @param context the task's context
070   * @throws IOException
071   */
072  public FileOutputCommitter(Path outputPath, 
073                             TaskAttemptContext context) throws IOException {
074    this(outputPath, (JobContext)context);
075    if (outputPath != null) {
076      workPath = getTaskAttemptPath(context, outputPath);
077    }
078  }
079  
080  /**
081   * Create a file output committer
082   * @param outputPath the job's output path, or null if you want the output
083   * committer to act as a noop.
084   * @param context the task's context
085   * @throws IOException
086   */
087  @Private
088  public FileOutputCommitter(Path outputPath, 
089                             JobContext context) throws IOException {
090    if (outputPath != null) {
091      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
092      this.outputPath = fs.makeQualified(outputPath);
093    }
094  }
095  
096  /**
097   * @return the path where final output of the job should be placed.  This
098   * could also be considered the committed application attempt path.
099   */
100  private Path getOutputPath() {
101    return this.outputPath;
102  }
103  
104  /**
105   * @return true if we have an output path set, else false.
106   */
107  private boolean hasOutputPath() {
108    return this.outputPath != null;
109  }
110  
111  /**
112   * @return the path where the output of pending job attempts are
113   * stored.
114   */
115  private Path getPendingJobAttemptsPath() {
116    return getPendingJobAttemptsPath(getOutputPath());
117  }
118  
119  /**
120   * Get the location of pending job attempts.
121   * @param out the base output directory.
122   * @return the location of pending job attempts.
123   */
124  private static Path getPendingJobAttemptsPath(Path out) {
125    return new Path(out, PENDING_DIR_NAME);
126  }
127  
128  /**
129   * Get the Application Attempt Id for this job
130   * @param context the context to look in
131   * @return the Application Attempt Id for a given job.
132   */
133  private static int getAppAttemptId(JobContext context) {
134    return context.getConfiguration().getInt(
135        MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
136  }
137  
138  /**
139   * Compute the path where the output of a given job attempt will be placed. 
140   * @param context the context of the job.  This is used to get the
141   * application attempt id.
142   * @return the path to store job attempt data.
143   */
144  public Path getJobAttemptPath(JobContext context) {
145    return getJobAttemptPath(context, getOutputPath());
146  }
147  
148  /**
149   * Compute the path where the output of a given job attempt will be placed. 
150   * @param context the context of the job.  This is used to get the
151   * application attempt id.
152   * @param out the output path to place these in.
153   * @return the path to store job attempt data.
154   */
155  public static Path getJobAttemptPath(JobContext context, Path out) {
156    return getJobAttemptPath(getAppAttemptId(context), out);
157  }
158  
159  /**
160   * Compute the path where the output of a given job attempt will be placed. 
161   * @param appAttemptId the ID of the application attempt for this job.
162   * @return the path to store job attempt data.
163   */
164  protected Path getJobAttemptPath(int appAttemptId) {
165    return getJobAttemptPath(appAttemptId, getOutputPath());
166  }
167  
168  /**
169   * Compute the path where the output of a given job attempt will be placed. 
170   * @param appAttemptId the ID of the application attempt for this job.
171   * @return the path to store job attempt data.
172   */
173  private static Path getJobAttemptPath(int appAttemptId, Path out) {
174    return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
175  }
176  
177  /**
178   * Compute the path where the output of pending task attempts are stored.
179   * @param context the context of the job with pending tasks. 
180   * @return the path where the output of pending task attempts are stored.
181   */
182  private Path getPendingTaskAttemptsPath(JobContext context) {
183    return getPendingTaskAttemptsPath(context, getOutputPath());
184  }
185  
186  /**
187   * Compute the path where the output of pending task attempts are stored.
188   * @param context the context of the job with pending tasks. 
189   * @return the path where the output of pending task attempts are stored.
190   */
191  private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
192    return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
193  }
194  
195  /**
196   * Compute the path where the output of a task attempt is stored until
197   * that task is committed.
198   * 
199   * @param context the context of the task attempt.
200   * @return the path where a task attempt should be stored.
201   */
202  public Path getTaskAttemptPath(TaskAttemptContext context) {
203    return new Path(getPendingTaskAttemptsPath(context), 
204        String.valueOf(context.getTaskAttemptID()));
205  }
206  
207  /**
208   * Compute the path where the output of a task attempt is stored until
209   * that task is committed.
210   * 
211   * @param context the context of the task attempt.
212   * @param out The output path to put things in.
213   * @return the path where a task attempt should be stored.
214   */
215  public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
216    return new Path(getPendingTaskAttemptsPath(context, out), 
217        String.valueOf(context.getTaskAttemptID()));
218  }
219  
220  /**
221   * Compute the path where the output of a committed task is stored until
222   * the entire job is committed.
223   * @param context the context of the task attempt
224   * @return the path where the output of a committed task is stored until
225   * the entire job is committed.
226   */
227  public Path getCommittedTaskPath(TaskAttemptContext context) {
228    return getCommittedTaskPath(getAppAttemptId(context), context);
229  }
230  
231  public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
232    return getCommittedTaskPath(getAppAttemptId(context), context, out);
233  }
234  
235  /**
236   * Compute the path where the output of a committed task is stored until the
237   * entire job is committed for a specific application attempt.
238   * @param appAttemptId the id of the application attempt to use
239   * @param context the context of any task.
240   * @return the path where the output of a committed task is stored.
241   */
242  protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
243    return new Path(getJobAttemptPath(appAttemptId),
244        String.valueOf(context.getTaskAttemptID().getTaskID()));
245  }
246  
247  private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
248    return new Path(getJobAttemptPath(appAttemptId, out),
249        String.valueOf(context.getTaskAttemptID().getTaskID()));
250  }
251  
252  private static class CommittedTaskFilter implements PathFilter {
253    @Override
254    public boolean accept(Path path) {
255      return !PENDING_DIR_NAME.equals(path.getName());
256    }
257  }
258  
259  /**
260   * Get a list of all paths where output from committed tasks are stored.
261   * @param context the context of the current job
262   * @return the list of these Paths/FileStatuses. 
263   * @throws IOException
264   */
265  private FileStatus[] getAllCommittedTaskPaths(JobContext context) 
266    throws IOException {
267    Path jobAttemptPath = getJobAttemptPath(context);
268    FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
269    return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
270  }
271  
272  /**
273   * Get the directory that the task should write results into.
274   * @return the work directory
275   * @throws IOException
276   */
277  public Path getWorkPath() throws IOException {
278    return workPath;
279  }
280
281  /**
282   * Create the temporary directory that is the root of all of the task 
283   * work directories.
284   * @param context the job's context
285   */
286  public void setupJob(JobContext context) throws IOException {
287    if (hasOutputPath()) {
288      Path jobAttemptPath = getJobAttemptPath(context);
289      FileSystem fs = jobAttemptPath.getFileSystem(
290          context.getConfiguration());
291      if (!fs.mkdirs(jobAttemptPath)) {
292        LOG.error("Mkdirs failed to create " + jobAttemptPath);
293      }
294    } else {
295      LOG.warn("Output Path is null in setupJob()");
296    }
297  }
298  
299  /**
300   * The job has completed so move all committed tasks to the final output dir.
301   * Delete the temporary directory, including all of the work directories.
302   * Create a _SUCCESS file to make it as successful.
303   * @param context the job's context
304   */
305  public void commitJob(JobContext context) throws IOException {
306    if (hasOutputPath()) {
307      Path finalOutput = getOutputPath();
308      FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
309      for(FileStatus stat: getAllCommittedTaskPaths(context)) {
310        mergePaths(fs, stat, finalOutput);
311      }
312
313      // delete the _temporary folder and create a _done file in the o/p folder
314      cleanupJob(context);
315      // True if the job requires output.dir marked on successful job.
316      // Note that by default it is set to true.
317      if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
318        Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
319        fs.create(markerPath).close();
320      }
321    } else {
322      LOG.warn("Output Path is null in commitJob()");
323    }
324  }
325
326  /**
327   * Merge two paths together.  Anything in from will be moved into to, if there
328   * are any name conflicts while merging the files or directories in from win.
329   * @param fs the File System to use
330   * @param from the path data is coming from.
331   * @param to the path data is going to.
332   * @throws IOException on any error
333   */
334  private static void mergePaths(FileSystem fs, final FileStatus from,
335      final Path to)
336    throws IOException {
337     LOG.debug("Merging data from "+from+" to "+to);
338     if(from.isFile()) {
339       if(fs.exists(to)) {
340         if(!fs.delete(to, true)) {
341           throw new IOException("Failed to delete "+to);
342         }
343       }
344
345       if(!fs.rename(from.getPath(), to)) {
346         throw new IOException("Failed to rename "+from+" to "+to);
347       }
348     } else if(from.isDirectory()) {
349       if(fs.exists(to)) {
350         FileStatus toStat = fs.getFileStatus(to);
351         if(!toStat.isDirectory()) {
352           if(!fs.delete(to, true)) {
353             throw new IOException("Failed to delete "+to);
354           }
355           if(!fs.rename(from.getPath(), to)) {
356             throw new IOException("Failed to rename "+from+" to "+to);
357           }
358         } else {
359           //It is a directory so merge everything in the directories
360           for(FileStatus subFrom: fs.listStatus(from.getPath())) {
361             Path subTo = new Path(to, subFrom.getPath().getName());
362             mergePaths(fs, subFrom, subTo);
363           }
364         }
365       } else {
366         //it does not exist just rename
367         if(!fs.rename(from.getPath(), to)) {
368           throw new IOException("Failed to rename "+from+" to "+to);
369         }
370       }
371     }
372  }
373
374  @Override
375  @Deprecated
376  public void cleanupJob(JobContext context) throws IOException {
377    if (hasOutputPath()) {
378      Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
379      FileSystem fs = pendingJobAttemptsPath
380          .getFileSystem(context.getConfiguration());
381      fs.delete(pendingJobAttemptsPath, true);
382    } else {
383      LOG.warn("Output Path is null in cleanupJob()");
384    }
385  }
386
387  /**
388   * Delete the temporary directory, including all of the work directories.
389   * @param context the job's context
390   */
391  @Override
392  public void abortJob(JobContext context, JobStatus.State state) 
393  throws IOException {
394    // delete the _temporary folder
395    cleanupJob(context);
396  }
397  
398  /**
399   * No task setup required.
400   */
401  @Override
402  public void setupTask(TaskAttemptContext context) throws IOException {
403    // FileOutputCommitter's setupTask doesn't do anything. Because the
404    // temporary task directory is created on demand when the 
405    // task is writing.
406  }
407
408  /**
409   * Move the files from the work directory to the job output directory
410   * @param context the task context
411   */
412  @Override
413  public void commitTask(TaskAttemptContext context) 
414  throws IOException {
415    commitTask(context, null);
416  }
417
418  @Private
419  public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
420  throws IOException {
421    TaskAttemptID attemptId = context.getTaskAttemptID();
422    if (hasOutputPath()) {
423      context.progress();
424      if(taskAttemptPath == null) {
425        taskAttemptPath = getTaskAttemptPath(context);
426      }
427      Path committedTaskPath = getCommittedTaskPath(context);
428      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
429      if (fs.exists(taskAttemptPath)) {
430        if(fs.exists(committedTaskPath)) {
431          if(!fs.delete(committedTaskPath, true)) {
432            throw new IOException("Could not delete " + committedTaskPath);
433          }
434        }
435        if(!fs.rename(taskAttemptPath, committedTaskPath)) {
436          throw new IOException("Could not rename " + taskAttemptPath + " to "
437              + committedTaskPath);
438        }
439        LOG.info("Saved output of task '" + attemptId + "' to " + 
440            committedTaskPath);
441      } else {
442        LOG.warn("No Output found for " + attemptId);
443      }
444    } else {
445      LOG.warn("Output Path is null in commitTask()");
446    }
447  }
448
449  /**
450   * Delete the work directory
451   * @throws IOException 
452   */
453  @Override
454  public void abortTask(TaskAttemptContext context) throws IOException {
455    abortTask(context, null);
456  }
457
458  @Private
459  public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
460    if (hasOutputPath()) { 
461      context.progress();
462      if(taskAttemptPath == null) {
463        taskAttemptPath = getTaskAttemptPath(context);
464      }
465      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
466      if(!fs.delete(taskAttemptPath, true)) {
467        LOG.warn("Could not delete "+taskAttemptPath);
468      }
469    } else {
470      LOG.warn("Output Path is null in abortTask()");
471    }
472  }
473
474  /**
475   * Did this task write any files in the work directory?
476   * @param context the task's context
477   */
478  @Override
479  public boolean needsTaskCommit(TaskAttemptContext context
480                                 ) throws IOException {
481    return needsTaskCommit(context, null);
482  }
483
484  @Private
485  public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
486    ) throws IOException {
487    if(hasOutputPath()) {
488      if(taskAttemptPath == null) {
489        taskAttemptPath = getTaskAttemptPath(context);
490      }
491      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
492      return fs.exists(taskAttemptPath);
493    }
494    return false;
495  }
496
497  @Override
498  @Deprecated
499  public boolean isRecoverySupported() {
500    return true;
501  }
502  
503  @Override
504  public void recoverTask(TaskAttemptContext context)
505      throws IOException {
506    if(hasOutputPath()) {
507      context.progress();
508      TaskAttemptID attemptId = context.getTaskAttemptID();
509      int previousAttempt = getAppAttemptId(context) - 1;
510      if (previousAttempt < 0) {
511        throw new IOException ("Cannot recover task output for first attempt...");
512      }
513
514      Path committedTaskPath = getCommittedTaskPath(context);
515      Path previousCommittedTaskPath = getCommittedTaskPath(
516          previousAttempt, context);
517      FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
518
519      LOG.debug("Trying to recover task from " + previousCommittedTaskPath 
520          + " into " + committedTaskPath);
521      if (fs.exists(previousCommittedTaskPath)) {
522        if(fs.exists(committedTaskPath)) {
523          if(!fs.delete(committedTaskPath, true)) {
524            throw new IOException("Could not delete "+committedTaskPath);
525          }
526        }
527        //Rename can fail if the parent directory does not yet exist.
528        Path committedParent = committedTaskPath.getParent();
529        fs.mkdirs(committedParent);
530        if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
531          throw new IOException("Could not rename " + previousCommittedTaskPath +
532              " to " + committedTaskPath);
533        }
534        LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
535      } else {
536        LOG.warn(attemptId+" had no output to recover.");
537      }
538    } else {
539      LOG.warn("Output Path is null in recoverTask()");
540    }
541  }
542}