001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.output;
020
021import java.io.IOException;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceAudience.Private;
027import org.apache.hadoop.classification.InterfaceStability;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.fs.PathFilter;
032import org.apache.hadoop.mapreduce.JobContext;
033import org.apache.hadoop.mapreduce.JobStatus;
034import org.apache.hadoop.mapreduce.MRJobConfig;
035import org.apache.hadoop.mapreduce.OutputCommitter;
036import org.apache.hadoop.mapreduce.TaskAttemptContext;
037import org.apache.hadoop.mapreduce.TaskAttemptID;
038
039/** An {@link OutputCommitter} that commits files specified 
040 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
041 **/
042@InterfaceAudience.Public
043@InterfaceStability.Stable
044public class FileOutputCommitter extends OutputCommitter {
045  private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
046
047  /** 
048   * Name of directory where pending data is placed.  Data that has not been
049   * committed yet.
050   */
051  public static final String PENDING_DIR_NAME = "_temporary";
052  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
053  public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 
054    "mapreduce.fileoutputcommitter.marksuccessfuljobs";
055  private Path outputPath = null;
056  private Path workPath = null;
057
058  /**
059   * Create a file output committer
060   * @param outputPath the job's output path, or null if you want the output
061   * committer to act as a noop.
062   * @param context the task's context
063   * @throws IOException
064   */
065  public FileOutputCommitter(Path outputPath, 
066                             TaskAttemptContext context) throws IOException {
067    this(outputPath, (JobContext)context);
068    if (outputPath != null) {
069      workPath = getTaskAttemptPath(context, outputPath);
070    }
071  }
072  
073  /**
074   * Create a file output committer
075   * @param outputPath the job's output path, or null if you want the output
076   * committer to act as a noop.
077   * @param context the task's context
078   * @throws IOException
079   */
080  @Private
081  public FileOutputCommitter(Path outputPath, 
082                             JobContext context) throws IOException {
083    if (outputPath != null) {
084      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
085      this.outputPath = fs.makeQualified(outputPath);
086    }
087  }
088  
089  /**
090   * @return the path where final output of the job should be placed.  This
091   * could also be considered the committed application attempt path.
092   */
093  private Path getOutputPath() {
094    return this.outputPath;
095  }
096  
097  /**
098   * @return true if we have an output path set, else false.
099   */
100  private boolean hasOutputPath() {
101    return this.outputPath != null;
102  }
103  
104  /**
105   * @return the path where the output of pending job attempts are
106   * stored.
107   */
108  private Path getPendingJobAttemptsPath() {
109    return getPendingJobAttemptsPath(getOutputPath());
110  }
111  
112  /**
113   * Get the location of pending job attempts.
114   * @param out the base output directory.
115   * @return the location of pending job attempts.
116   */
117  private static Path getPendingJobAttemptsPath(Path out) {
118    return new Path(out, PENDING_DIR_NAME);
119  }
120  
121  /**
122   * Get the Application Attempt Id for this job
123   * @param context the context to look in
124   * @return the Application Attempt Id for a given job.
125   */
126  private static int getAppAttemptId(JobContext context) {
127    return context.getConfiguration().getInt(
128        MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
129  }
130  
131  /**
132   * Compute the path where the output of a given job attempt will be placed. 
133   * @param context the context of the job.  This is used to get the
134   * application attempt id.
135   * @return the path to store job attempt data.
136   */
137  public Path getJobAttemptPath(JobContext context) {
138    return getJobAttemptPath(context, getOutputPath());
139  }
140  
141  /**
142   * Compute the path where the output of a given job attempt will be placed. 
143   * @param context the context of the job.  This is used to get the
144   * application attempt id.
145   * @param out the output path to place these in.
146   * @return the path to store job attempt data.
147   */
148  public static Path getJobAttemptPath(JobContext context, Path out) {
149    return getJobAttemptPath(getAppAttemptId(context), out);
150  }
151  
152  /**
153   * Compute the path where the output of a given job attempt will be placed. 
154   * @param appAttemptId the ID of the application attempt for this job.
155   * @return the path to store job attempt data.
156   */
157  private Path getJobAttemptPath(int appAttemptId) {
158    return getJobAttemptPath(appAttemptId, getOutputPath());
159  }
160  
161  /**
162   * Compute the path where the output of a given job attempt will be placed. 
163   * @param appAttemptId the ID of the application attempt for this job.
164   * @return the path to store job attempt data.
165   */
166  private static Path getJobAttemptPath(int appAttemptId, Path out) {
167    return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
168  }
169  
170  /**
171   * Compute the path where the output of pending task attempts are stored.
172   * @param context the context of the job with pending tasks. 
173   * @return the path where the output of pending task attempts are stored.
174   */
175  private Path getPendingTaskAttemptsPath(JobContext context) {
176    return getPendingTaskAttemptsPath(context, getOutputPath());
177  }
178  
179  /**
180   * Compute the path where the output of pending task attempts are stored.
181   * @param context the context of the job with pending tasks. 
182   * @return the path where the output of pending task attempts are stored.
183   */
184  private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
185    return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
186  }
187  
188  /**
189   * Compute the path where the output of a task attempt is stored until
190   * that task is committed.
191   * 
192   * @param context the context of the task attempt.
193   * @return the path where a task attempt should be stored.
194   */
195  public Path getTaskAttemptPath(TaskAttemptContext context) {
196    return new Path(getPendingTaskAttemptsPath(context), 
197        String.valueOf(context.getTaskAttemptID()));
198  }
199  
200  /**
201   * Compute the path where the output of a task attempt is stored until
202   * that task is committed.
203   * 
204   * @param context the context of the task attempt.
205   * @param out The output path to put things in.
206   * @return the path where a task attempt should be stored.
207   */
208  public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
209    return new Path(getPendingTaskAttemptsPath(context, out), 
210        String.valueOf(context.getTaskAttemptID()));
211  }
212  
213  /**
214   * Compute the path where the output of a committed task is stored until
215   * the entire job is committed.
216   * @param context the context of the task attempt
217   * @return the path where the output of a committed task is stored until
218   * the entire job is committed.
219   */
220  public Path getCommittedTaskPath(TaskAttemptContext context) {
221    return getCommittedTaskPath(getAppAttemptId(context), context);
222  }
223  
224  public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
225    return getCommittedTaskPath(getAppAttemptId(context), context, out);
226  }
227  
228  /**
229   * Compute the path where the output of a committed task is stored until the
230   * entire job is committed for a specific application attempt.
231   * @param appAttemptId the id of the application attempt to use
232   * @param context the context of any task.
233   * @return the path where the output of a committed task is stored.
234   */
235  private Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
236    return new Path(getJobAttemptPath(appAttemptId),
237        String.valueOf(context.getTaskAttemptID().getTaskID()));
238  }
239  
240  private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
241    return new Path(getJobAttemptPath(appAttemptId, out),
242        String.valueOf(context.getTaskAttemptID().getTaskID()));
243  }
244  
245  private static class CommittedTaskFilter implements PathFilter {
246    @Override
247    public boolean accept(Path path) {
248      return !PENDING_DIR_NAME.equals(path.getName());
249    }
250  }
251  
252  /**
253   * Get a list of all paths where output from committed tasks are stored.
254   * @param context the context of the current job
255   * @return the list of these Paths/FileStatuses. 
256   * @throws IOException
257   */
258  private FileStatus[] getAllCommittedTaskPaths(JobContext context) 
259    throws IOException {
260    Path jobAttemptPath = getJobAttemptPath(context);
261    FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
262    return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
263  }
264  
265  /**
266   * Get the directory that the task should write results into.
267   * @return the work directory
268   * @throws IOException
269   */
270  public Path getWorkPath() throws IOException {
271    return workPath;
272  }
273
274  /**
275   * Create the temporary directory that is the root of all of the task 
276   * work directories.
277   * @param context the job's context
278   */
279  public void setupJob(JobContext context) throws IOException {
280    if (hasOutputPath()) {
281      Path jobAttemptPath = getJobAttemptPath(context);
282      FileSystem fs = jobAttemptPath.getFileSystem(
283          context.getConfiguration());
284      if (!fs.mkdirs(jobAttemptPath)) {
285        LOG.error("Mkdirs failed to create " + jobAttemptPath);
286      }
287    } else {
288      LOG.warn("Output Path is null in setupJob()");
289    }
290  }
291  
292  /**
293   * The job has completed so move all committed tasks to the final output dir.
294   * Delete the temporary directory, including all of the work directories.
295   * Create a _SUCCESS file to make it as successful.
296   * @param context the job's context
297   */
298  public void commitJob(JobContext context) throws IOException {
299    if (hasOutputPath()) {
300      Path finalOutput = getOutputPath();
301      FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
302      for(FileStatus stat: getAllCommittedTaskPaths(context)) {
303        mergePaths(fs, stat, finalOutput);
304      }
305
306      // delete the _temporary folder and create a _done file in the o/p folder
307      cleanupJob(context);
308      // True if the job requires output.dir marked on successful job.
309      // Note that by default it is set to true.
310      if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
311        Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
312        fs.create(markerPath).close();
313      }
314    } else {
315      LOG.warn("Output Path is null in commitJob()");
316    }
317  }
318
319  /**
320   * Merge two paths together.  Anything in from will be moved into to, if there
321   * are any name conflicts while merging the files or directories in from win.
322   * @param fs the File System to use
323   * @param from the path data is coming from.
324   * @param to the path data is going to.
325   * @throws IOException on any error
326   */
327  private static void mergePaths(FileSystem fs, final FileStatus from,
328      final Path to)
329    throws IOException {
330     LOG.debug("Merging data from "+from+" to "+to);
331     if(from.isFile()) {
332       if(fs.exists(to)) {
333         if(!fs.delete(to, true)) {
334           throw new IOException("Failed to delete "+to);
335         }
336       }
337
338       if(!fs.rename(from.getPath(), to)) {
339         throw new IOException("Failed to rename "+from+" to "+to);
340       }
341     } else if(from.isDirectory()) {
342       if(fs.exists(to)) {
343         FileStatus toStat = fs.getFileStatus(to);
344         if(!toStat.isDirectory()) {
345           if(!fs.delete(to, true)) {
346             throw new IOException("Failed to delete "+to);
347           }
348           if(!fs.rename(from.getPath(), to)) {
349             throw new IOException("Failed to rename "+from+" to "+to);
350           }
351         } else {
352           //It is a directory so merge everything in the directories
353           for(FileStatus subFrom: fs.listStatus(from.getPath())) {
354             Path subTo = new Path(to, subFrom.getPath().getName());
355             mergePaths(fs, subFrom, subTo);
356           }
357         }
358       } else {
359         //it does not exist just rename
360         if(!fs.rename(from.getPath(), to)) {
361           throw new IOException("Failed to rename "+from+" to "+to);
362         }
363       }
364     }
365  }
366
367  @Override
368  @Deprecated
369  public void cleanupJob(JobContext context) throws IOException {
370    if (hasOutputPath()) {
371      Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
372      FileSystem fs = pendingJobAttemptsPath
373          .getFileSystem(context.getConfiguration());
374      fs.delete(pendingJobAttemptsPath, true);
375    } else {
376      LOG.warn("Output Path is null in cleanupJob()");
377    }
378  }
379
380  /**
381   * Delete the temporary directory, including all of the work directories.
382   * @param context the job's context
383   */
384  @Override
385  public void abortJob(JobContext context, JobStatus.State state) 
386  throws IOException {
387    // delete the _temporary folder
388    cleanupJob(context);
389  }
390  
391  /**
392   * No task setup required.
393   */
394  @Override
395  public void setupTask(TaskAttemptContext context) throws IOException {
396    // FileOutputCommitter's setupTask doesn't do anything. Because the
397    // temporary task directory is created on demand when the 
398    // task is writing.
399  }
400
401  /**
402   * Move the files from the work directory to the job output directory
403   * @param context the task context
404   */
405  @Override
406  public void commitTask(TaskAttemptContext context) 
407  throws IOException {
408    commitTask(context, null);
409  }
410
411  @Private
412  public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
413  throws IOException {
414    TaskAttemptID attemptId = context.getTaskAttemptID();
415    if (hasOutputPath()) {
416      context.progress();
417      if(taskAttemptPath == null) {
418        taskAttemptPath = getTaskAttemptPath(context);
419      }
420      Path committedTaskPath = getCommittedTaskPath(context);
421      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
422      if (fs.exists(taskAttemptPath)) {
423        if(fs.exists(committedTaskPath)) {
424          if(!fs.delete(committedTaskPath, true)) {
425            throw new IOException("Could not delete " + committedTaskPath);
426          }
427        }
428        if(!fs.rename(taskAttemptPath, committedTaskPath)) {
429          throw new IOException("Could not rename " + taskAttemptPath + " to "
430              + committedTaskPath);
431        }
432        LOG.info("Saved output of task '" + attemptId + "' to " + 
433            committedTaskPath);
434      } else {
435        LOG.warn("No Output found for " + attemptId);
436      }
437    } else {
438      LOG.warn("Output Path is null in commitTask()");
439    }
440  }
441
442  /**
443   * Delete the work directory
444   * @throws IOException 
445   */
446  @Override
447  public void abortTask(TaskAttemptContext context) throws IOException {
448    abortTask(context, null);
449  }
450
451  @Private
452  public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
453    if (hasOutputPath()) { 
454      context.progress();
455      if(taskAttemptPath == null) {
456        taskAttemptPath = getTaskAttemptPath(context);
457      }
458      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
459      if(!fs.delete(taskAttemptPath, true)) {
460        LOG.warn("Could not delete "+taskAttemptPath);
461      }
462    } else {
463      LOG.warn("Output Path is null in abortTask()");
464    }
465  }
466
467  /**
468   * Did this task write any files in the work directory?
469   * @param context the task's context
470   */
471  @Override
472  public boolean needsTaskCommit(TaskAttemptContext context
473                                 ) throws IOException {
474    return needsTaskCommit(context, null);
475  }
476
477  @Private
478  public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
479    ) throws IOException {
480    if(hasOutputPath()) {
481      if(taskAttemptPath == null) {
482        taskAttemptPath = getTaskAttemptPath(context);
483      }
484      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
485      return fs.exists(taskAttemptPath);
486    }
487    return false;
488  }
489
490  @Override
491  public boolean isRecoverySupported() {
492    return true;
493  }
494  
495  @Override
496  public void recoverTask(TaskAttemptContext context)
497      throws IOException {
498    if(hasOutputPath()) {
499      context.progress();
500      TaskAttemptID attemptId = context.getTaskAttemptID();
501      int previousAttempt = getAppAttemptId(context) - 1;
502      if (previousAttempt < 0) {
503        throw new IOException ("Cannot recover task output for first attempt...");
504      }
505
506      Path committedTaskPath = getCommittedTaskPath(context);
507      Path previousCommittedTaskPath = getCommittedTaskPath(
508          previousAttempt, context);
509      FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
510
511      LOG.debug("Trying to recover task from " + previousCommittedTaskPath 
512          + " into " + committedTaskPath);
513      if (fs.exists(previousCommittedTaskPath)) {
514        if(fs.exists(committedTaskPath)) {
515          if(!fs.delete(committedTaskPath, true)) {
516            throw new IOException("Could not delete "+committedTaskPath);
517          }
518        }
519        //Rename can fail if the parent directory does not yet exist.
520        Path committedParent = committedTaskPath.getParent();
521        fs.mkdirs(committedParent);
522        if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
523          throw new IOException("Could not rename " + previousCommittedTaskPath +
524              " to " + committedTaskPath);
525        }
526        LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
527      } else {
528        LOG.warn(attemptId+" had no output to recover.");
529      }
530    } else {
531      LOG.warn("Output Path is null in recoverTask()");
532    }
533  }
534}