001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025
026/**
027 * <code>OutputCommitter</code> describes the commit of task output for a 
028 * Map-Reduce job.
029 *
030 * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of 
031 * the job to:<p>
032 * <ol>
033 *   <li>
034 *   Setup the job during initialization. For example, create the temporary 
035 *   output directory for the job during the initialization of the job.
036 *   </li>
037 *   <li>
038 *   Cleanup the job after the job completion. For example, remove the
039 *   temporary output directory after the job completion. 
040 *   </li>
041 *   <li>
042 *   Setup the task temporary output.
043 *   </li> 
044 *   <li>
045 *   Check whether a task needs a commit. This is to avoid the commit
046 *   procedure if a task does not need commit.
047 *   </li>
048 *   <li>
049 *   Commit of the task output.
050 *   </li>  
051 *   <li>
052 *   Discard the task commit.
053 *   </li>
054 * </ol>
055 * The methods in this class can be called from several different processes and
056 * from several different contexts.  It is important to know which process and
057 * which context each is called from.  Each method should be marked accordingly
058 * in its documentation.  It is also important to note that not all methods are
059 * guaranteed to be called once and only once.  If a method is not guaranteed to
060 * have this property the output committer needs to handle this appropriately. 
061 * Also note it will only be in rare situations where they may be called 
062 * multiple times for the same task.
063 * 
064 * @see FileOutputCommitter 
065 * @see JobContext
066 * @see TaskAttemptContext 
067 */
068@InterfaceAudience.Public
069@InterfaceStability.Stable
070public abstract class OutputCommitter 
071                extends org.apache.hadoop.mapreduce.OutputCommitter {
072  /**
073   * For the framework to setup the job output during initialization.  This is
074   * called from the application master process for the entire job. This will be
075   * called multiple times, once per job attempt.
076   * 
077   * @param jobContext Context of the job whose output is being written.
078   * @throws IOException if temporary output could not be created
079   */
080  public abstract void setupJob(JobContext jobContext) throws IOException;
081
082  /**
083   * For cleaning up the job's output after job completion.  This is called
084   * from the application master process for the entire job. This may be called
085   * multiple times.
086   * 
087   * @param jobContext Context of the job whose output is being written.
088   * @throws IOException
089   * @deprecated Use {@link #commitJob(JobContext)} or 
090   *                 {@link #abortJob(JobContext, int)} instead.
091   */
092  @Deprecated
093  public void cleanupJob(JobContext jobContext) throws IOException { }
094
095  /**
096   * For committing job's output after successful job completion. Note that this
097   * is invoked for jobs with final runstate as SUCCESSFUL.  This is called
098   * from the application master process for the entire job. This is guaranteed
099   * to only be called once.  If it throws an exception the entire job will
100   * fail.
101   * 
102   * @param jobContext Context of the job whose output is being written.
103   * @throws IOException 
104   */
105  public void commitJob(JobContext jobContext) throws IOException {
106    cleanupJob(jobContext);
107  }
108  
109  /**
110   * For aborting an unsuccessful job's output. Note that this is invoked for 
111   * jobs with final runstate as {@link JobStatus#FAILED} or 
112   * {@link JobStatus#KILLED}. This is called from the application
113   * master process for the entire job. This may be called multiple times.
114   * 
115   * @param jobContext Context of the job whose output is being written.
116   * @param status final runstate of the job
117   * @throws IOException
118   */
119  public void abortJob(JobContext jobContext, int status) 
120  throws IOException {
121    cleanupJob(jobContext);
122  }
123  
124  /**
125   * Sets up output for the task. This is called from each individual task's
126   * process that will output to HDFS, and it is called just for that task. This
127   * may be called multiple times for the same task, but for different task
128   * attempts.
129   * 
130   * @param taskContext Context of the task whose output is being written.
131   * @throws IOException
132   */
133  public abstract void setupTask(TaskAttemptContext taskContext)
134  throws IOException;
135  
136  /**
137   * Check whether task needs a commit.  This is called from each individual
138   * task's process that will output to HDFS, and it is called just for that
139   * task.
140   * 
141   * @param taskContext
142   * @return true/false
143   * @throws IOException
144   */
145  public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
146  throws IOException;
147
148  /**
149   * To promote the task's temporary output to final output location.
150   * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this
151   * task is the task that the AM determines finished first, this method
152   * is called to commit an individual task's output.  This is to mark
153   * that tasks output as complete, as {@link #commitJob(JobContext)} will 
154   * also be called later on if the entire job finished successfully. This
155   * is called from a task's process. This may be called multiple times for the
156   * same task, but different task attempts.  It should be very rare for this to
157   * be called multiple times and requires odd networking failures to make this
158   * happen. In the future the Hadoop framework may eliminate this race.
159   * 
160   * @param taskContext Context of the task whose output is being written.
161   * @throws IOException if commit is not 
162   */
163  public abstract void commitTask(TaskAttemptContext taskContext)
164  throws IOException;
165  
166  /**
167   * Discard the task output. This is called from a task's process to clean 
168   * up a single task's output that can not yet been committed. This may be
169   * called multiple times for the same task, but for different task attempts.
170   * 
171   * @param taskContext
172   * @throws IOException
173   */
174  public abstract void abortTask(TaskAttemptContext taskContext)
175  throws IOException;
176
177  /**
178   * This method implements the new interface by calling the old method. Note
179   * that the input types are different between the new and old apis and this is
180   * a bridge between the two.
181   * 
182   * @deprecated Use {@link #isRecoverySupported(JobContext)} instead.
183   */
184  @Deprecated
185  @Override
186  public boolean isRecoverySupported() {
187    return false;
188  }
189
190  /**
191   * Is task output recovery supported for restarting jobs?
192   * 
193   * If task output recovery is supported, job restart can be done more
194   * efficiently.
195   * 
196   * @param jobContext
197   *          Context of the job whose output is being written.
198   * @return <code>true</code> if task output recovery is supported,
199   *         <code>false</code> otherwise
200   * @throws IOException
201   * @see #recoverTask(TaskAttemptContext)
202   */
203  public boolean isRecoverySupported(JobContext jobContext) throws IOException {
204    return isRecoverySupported();
205  }
206
207  /**
208   * Recover the task output. 
209   * 
210   * The retry-count for the job will be passed via the 
211   * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in  
212   * {@link TaskAttemptContext#getConfiguration()} for the 
213   * <code>OutputCommitter</code>. This is called from the application master
214   * process, but it is called individually for each task.
215   * 
216   * If an exception is thrown the task will be attempted again. 
217   * 
218   * @param taskContext Context of the task whose output is being recovered
219   * @throws IOException
220   */
221  public void recoverTask(TaskAttemptContext taskContext) 
222  throws IOException {
223  }
224  
225  /**
226   * This method implements the new interface by calling the old method. Note
227   * that the input types are different between the new and old apis and this
228   * is a bridge between the two.
229   */
230  @Override
231  public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext
232                             ) throws IOException {
233    setupJob((JobContext) jobContext);
234  }
235
236  /**
237   * This method implements the new interface by calling the old method. Note
238   * that the input types are different between the new and old apis and this
239   * is a bridge between the two.
240   * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)}
241   *             or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
242   *             instead.
243   */
244  @Override
245  @Deprecated
246  public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
247                               ) throws IOException {
248    cleanupJob((JobContext) context);
249  }
250
251  /**
252   * This method implements the new interface by calling the old method. Note
253   * that the input types are different between the new and old apis and this
254   * is a bridge between the two.
255   */
256  @Override
257  public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
258                             ) throws IOException {
259    commitJob((JobContext) context);
260  }
261  
262  /**
263   * This method implements the new interface by calling the old method. Note
264   * that the input types are different between the new and old apis and this
265   * is a bridge between the two.
266   */
267  @Override
268  public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 
269                                   org.apache.hadoop.mapreduce.JobStatus.State runState) 
270  throws IOException {
271    int state = JobStatus.getOldNewJobRunState(runState);
272    if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
273      throw new IOException ("Invalid job run state : " + runState.name());
274    }
275    abortJob((JobContext) context, state);
276  }
277  
278  /**
279   * This method implements the new interface by calling the old method. Note
280   * that the input types are different between the new and old apis and this
281   * is a bridge between the two.
282   */
283  @Override
284  public final 
285  void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
286                 ) throws IOException {
287    setupTask((TaskAttemptContext) taskContext);
288  }
289  
290  /**
291   * This method implements the new interface by calling the old method. Note
292   * that the input types are different between the new and old apis and this
293   * is a bridge between the two.
294   */
295  @Override
296  public final boolean 
297    needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
298                    ) throws IOException {
299    return needsTaskCommit((TaskAttemptContext) taskContext);
300  }
301
302  /**
303   * This method implements the new interface by calling the old method. Note
304   * that the input types are different between the new and old apis and this
305   * is a bridge between the two.
306   */
307  @Override
308  public final 
309  void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
310                  ) throws IOException {
311    commitTask((TaskAttemptContext) taskContext);
312  }
313  
314  /**
315   * This method implements the new interface by calling the old method. Note
316   * that the input types are different between the new and old apis and this
317   * is a bridge between the two.
318   */
319  @Override
320  public final 
321  void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
322                 ) throws IOException {
323    abortTask((TaskAttemptContext) taskContext);
324  }
325  
326  /**
327   * This method implements the new interface by calling the old method. Note
328   * that the input types are different between the new and old apis and this
329   * is a bridge between the two.
330   */
331  @Override
332  public final 
333  void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
334      ) throws IOException {
335    recoverTask((TaskAttemptContext) taskContext);
336  }
337
338  /**
339   * This method implements the new interface by calling the old method. Note
340   * that the input types are different between the new and old apis and this is
341   * a bridge between the two.
342   */
343  @Override
344  public final boolean isRecoverySupported(
345      org.apache.hadoop.mapreduce.JobContext context) throws IOException {
346    return isRecoverySupported((JobContext) context);
347  }
348
349}