001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025
026/**
027 * <code>OutputCommitter</code> describes the commit of task output for a 
028 * Map-Reduce job.
029 *
030 * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of 
031 * the job to:<p>
032 * <ol>
033 *   <li>
034 *   Setup the job during initialization. For example, create the temporary 
035 *   output directory for the job during the initialization of the job.
036 *   </li>
037 *   <li>
038 *   Cleanup the job after the job completion. For example, remove the
039 *   temporary output directory after the job completion. 
040 *   </li>
041 *   <li>
042 *   Setup the task temporary output.
043 *   </li> 
044 *   <li>
045 *   Check whether a task needs a commit. This is to avoid the commit
046 *   procedure if a task does not need commit.
047 *   </li>
048 *   <li>
049 *   Commit of the task output.
050 *   </li>  
051 *   <li>
052 *   Discard the task commit.
053 *   </li>
054 * </ol>
055 * The methods in this class can be called from several different processes and
056 * from several different contexts.  It is important to know which process and
057 * which context each is called from.  Each method should be marked accordingly
058 * in its documentation.  It is also important to note that not all methods are
059 * guaranteed to be called once and only once.  If a method is not guaranteed to
060 * have this property the output committer needs to handle this appropriately. 
061 * Also note it will only be in rare situations where they may be called 
062 * multiple times for the same task.
063 * 
064 * @see FileOutputCommitter 
065 * @see JobContext
066 * @see TaskAttemptContext 
067 */
068@InterfaceAudience.Public
069@InterfaceStability.Stable
070public abstract class OutputCommitter 
071                extends org.apache.hadoop.mapreduce.OutputCommitter {
072  /**
073   * For the framework to setup the job output during initialization.  This is
074   * called from the application master process for the entire job. This will be
075   * called multiple times, once per job attempt.
076   * 
077   * @param jobContext Context of the job whose output is being written.
078   * @throws IOException if temporary output could not be created
079   */
080  public abstract void setupJob(JobContext jobContext) throws IOException;
081
082  /**
083   * For cleaning up the job's output after job completion.  This is called
084   * from the application master process for the entire job. This may be called
085   * multiple times.
086   * 
087   * @param jobContext Context of the job whose output is being written.
088   * @throws IOException
089   * @deprecated Use {@link #commitJob(JobContext)} or 
090   *                 {@link #abortJob(JobContext, int)} instead.
091   */
092  @Deprecated
093  public void cleanupJob(JobContext jobContext) throws IOException { }
094
095  /**
096   * For committing job's output after successful job completion. Note that this
097   * is invoked for jobs with final runstate as SUCCESSFUL.  This is called
098   * from the application master process for the entire job. This is guaranteed
099   * to only be called once.  If it throws an exception the entire job will
100   * fail.
101   * 
102   * @param jobContext Context of the job whose output is being written.
103   * @throws IOException 
104   */
105  public void commitJob(JobContext jobContext) throws IOException {
106    cleanupJob(jobContext);
107  }
108  
109  /**
110   * For aborting an unsuccessful job's output. Note that this is invoked for 
111   * jobs with final runstate as {@link JobStatus#FAILED} or 
112   * {@link JobStatus#KILLED}. This is called from the application
113   * master process for the entire job. This may be called multiple times.
114   * 
115   * @param jobContext Context of the job whose output is being written.
116   * @param status final runstate of the job
117   * @throws IOException
118   */
119  public void abortJob(JobContext jobContext, int status) 
120  throws IOException {
121    cleanupJob(jobContext);
122  }
123  
124  /**
125   * Sets up output for the task. This is called from each individual task's
126   * process that will output to HDFS, and it is called just for that task. This
127   * may be called multiple times for the same task, but for different task
128   * attempts.
129   * 
130   * @param taskContext Context of the task whose output is being written.
131   * @throws IOException
132   */
133  public abstract void setupTask(TaskAttemptContext taskContext)
134  throws IOException;
135  
136  /**
137   * Check whether task needs a commit.  This is called from each individual
138   * task's process that will output to HDFS, and it is called just for that
139   * task.
140   * 
141   * @param taskContext
142   * @return true/false
143   * @throws IOException
144   */
145  public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
146  throws IOException;
147
148  /**
149   * To promote the task's temporary output to final output location.
150   * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this
151   * task is the task that the AM determines finished first, this method
152   * is called to commit an individual task's output.  This is to mark
153   * that tasks output as complete, as {@link #commitJob(JobContext)} will 
154   * also be called later on if the entire job finished successfully. This
155   * is called from a task's process. This may be called multiple times for the
156   * same task, but different task attempts.  It should be very rare for this to
157   * be called multiple times and requires odd networking failures to make this
158   * happen. In the future the Hadoop framework may eliminate this race.
159   * 
160   * @param taskContext Context of the task whose output is being written.
161   * @throws IOException if commit is not 
162   */
163  public abstract void commitTask(TaskAttemptContext taskContext)
164  throws IOException;
165  
166  /**
167   * Discard the task output. This is called from a task's process to clean 
168   * up a single task's output that can not yet been committed. This may be
169   * called multiple times for the same task, but for different task attempts.
170   * 
171   * @param taskContext
172   * @throws IOException
173   */
174  public abstract void abortTask(TaskAttemptContext taskContext)
175  throws IOException;
176
177  /**
178   * This method implements the new interface by calling the old method. Note
179   * that the input types are different between the new and old apis and this is
180   * a bridge between the two.
181   * 
182   * @deprecated Use {@link #isRecoverySupported(JobContext)} instead.
183   */
184  @Deprecated
185  @Override
186  public boolean isRecoverySupported() {
187    return false;
188  }
189
190  /**
191   * Is task output recovery supported for restarting jobs?
192   * 
193   * If task output recovery is supported, job restart can be done more
194   * efficiently.
195   *
196   * @param jobContext
197   *          Context of the job whose output is being written.
198   * @return <code>true</code> if task output recovery is supported,
199   *         <code>false</code> otherwise
200   * @throws IOException
201   * @see #recoverTask(TaskAttemptContext)
202   */
203  public boolean isRecoverySupported(JobContext jobContext) throws IOException {
204    return isRecoverySupported();
205  }
206
207  /**
208   * Returns true if an in-progress job commit can be retried. If the MR AM is
209   * re-run then it will check this value to determine if it can retry an
210   * in-progress commit that was started by a previous version.
211   * Note that in rare scenarios, the previous AM version might still be running
212   * at that time, due to system anomalies. Hence if this method returns true
213   * then the retry commit operation should be able to run concurrently with
214   * the previous operation.
215   *
216   * If repeatable job commit is supported, job restart can tolerate previous
217   * AM failures during job commit.
218   *
219   * By default, it is not supported. Extended classes (like:
220   * FileOutputCommitter) should explicitly override it if provide support.
221   *
222   * @param jobContext
223   *          Context of the job whose output is being written.
224   * @return <code>true</code> repeatable job commit is supported,
225   *         <code>false</code> otherwise
226   * @throws IOException
227   */
228  public boolean isCommitJobRepeatable(JobContext jobContext) throws
229      IOException {
230    return false;
231  }
232
233  @Override
234  public boolean isCommitJobRepeatable(org.apache.hadoop.mapreduce.JobContext
235      jobContext) throws IOException {
236    return isCommitJobRepeatable((JobContext) jobContext);
237  }
238
239  /**
240   * Recover the task output. 
241   * 
242   * The retry-count for the job will be passed via the 
243   * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in  
244   * {@link TaskAttemptContext#getConfiguration()} for the 
245   * <code>OutputCommitter</code>. This is called from the application master
246   * process, but it is called individually for each task.
247   * 
248   * If an exception is thrown the task will be attempted again. 
249   * 
250   * @param taskContext Context of the task whose output is being recovered
251   * @throws IOException
252   */
253  public void recoverTask(TaskAttemptContext taskContext) 
254  throws IOException {
255  }
256  
257  /**
258   * This method implements the new interface by calling the old method. Note
259   * that the input types are different between the new and old apis and this
260   * is a bridge between the two.
261   */
262  @Override
263  public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext
264                             ) throws IOException {
265    setupJob((JobContext) jobContext);
266  }
267
268  /**
269   * This method implements the new interface by calling the old method. Note
270   * that the input types are different between the new and old apis and this
271   * is a bridge between the two.
272   * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)}
273   *             or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
274   *             instead.
275   */
276  @Override
277  @Deprecated
278  public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
279                               ) throws IOException {
280    cleanupJob((JobContext) context);
281  }
282
283  /**
284   * This method implements the new interface by calling the old method. Note
285   * that the input types are different between the new and old apis and this
286   * is a bridge between the two.
287   */
288  @Override
289  public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
290                             ) throws IOException {
291    commitJob((JobContext) context);
292  }
293  
294  /**
295   * This method implements the new interface by calling the old method. Note
296   * that the input types are different between the new and old apis and this
297   * is a bridge between the two.
298   */
299  @Override
300  public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 
301                                   org.apache.hadoop.mapreduce.JobStatus.State runState) 
302  throws IOException {
303    int state = JobStatus.getOldNewJobRunState(runState);
304    if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
305      throw new IOException ("Invalid job run state : " + runState.name());
306    }
307    abortJob((JobContext) context, state);
308  }
309  
310  /**
311   * This method implements the new interface by calling the old method. Note
312   * that the input types are different between the new and old apis and this
313   * is a bridge between the two.
314   */
315  @Override
316  public final 
317  void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
318                 ) throws IOException {
319    setupTask((TaskAttemptContext) taskContext);
320  }
321  
322  /**
323   * This method implements the new interface by calling the old method. Note
324   * that the input types are different between the new and old apis and this
325   * is a bridge between the two.
326   */
327  @Override
328  public final boolean 
329    needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
330                    ) throws IOException {
331    return needsTaskCommit((TaskAttemptContext) taskContext);
332  }
333
334  /**
335   * This method implements the new interface by calling the old method. Note
336   * that the input types are different between the new and old apis and this
337   * is a bridge between the two.
338   */
339  @Override
340  public final 
341  void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
342                  ) throws IOException {
343    commitTask((TaskAttemptContext) taskContext);
344  }
345  
346  /**
347   * This method implements the new interface by calling the old method. Note
348   * that the input types are different between the new and old apis and this
349   * is a bridge between the two.
350   */
351  @Override
352  public final 
353  void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
354                 ) throws IOException {
355    abortTask((TaskAttemptContext) taskContext);
356  }
357  
358  /**
359   * This method implements the new interface by calling the old method. Note
360   * that the input types are different between the new and old apis and this
361   * is a bridge between the two.
362   */
363  @Override
364  public final 
365  void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
366      ) throws IOException {
367    recoverTask((TaskAttemptContext) taskContext);
368  }
369
370  /**
371   * This method implements the new interface by calling the old method. Note
372   * that the input types are different between the new and old apis and this is
373   * a bridge between the two.
374   */
375  @Override
376  public final boolean isRecoverySupported(
377      org.apache.hadoop.mapreduce.JobContext context) throws IOException {
378    return isRecoverySupported((JobContext) context);
379  }
380
381}