001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025
026/**
027 * <code>OutputCommitter</code> describes the commit of task output for a 
028 * Map-Reduce job.
029 *
030 * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of 
031 * the job to:<p>
032 * <ol>
033 *   <li>
034 *   Setup the job during initialization. For example, create the temporary 
035 *   output directory for the job during the initialization of the job.
036 *   </li>
037 *   <li>
038 *   Cleanup the job after the job completion. For example, remove the
039 *   temporary output directory after the job completion. 
040 *   </li>
041 *   <li>
042 *   Setup the task temporary output.
043 *   </li> 
044 *   <li>
045 *   Check whether a task needs a commit. This is to avoid the commit
046 *   procedure if a task does not need commit.
047 *   </li>
048 *   <li>
049 *   Commit of the task output.
050 *   </li>  
051 *   <li>
052 *   Discard the task commit.
053 *   </li>
054 * </ol>
055 * The methods in this class can be called from several different processes and
056 * from several different contexts.  It is important to know which process and
057 * which context each is called from.  Each method should be marked accordingly
058 * in its documentation.  It is also important to note that not all methods are
059 * guaranteed to be called once and only once.  If a method is not guaranteed to
060 * have this property the output committer needs to handle this appropriately. 
061 * Also note it will only be in rare situations where they may be called 
062 * multiple times for the same task.
063 * 
064 * @see FileOutputCommitter 
065 * @see JobContext
066 * @see TaskAttemptContext 
067 */
068@InterfaceAudience.Public
069@InterfaceStability.Stable
070public abstract class OutputCommitter 
071                extends org.apache.hadoop.mapreduce.OutputCommitter {
072  /**
073   * For the framework to setup the job output during initialization.  This is
074   * called from the application master process for the entire job. This will be
075   * called multiple times, once per job attempt.
076   * 
077   * @param jobContext Context of the job whose output is being written.
078   * @throws IOException if temporary output could not be created
079   */
080  public abstract void setupJob(JobContext jobContext) throws IOException;
081
082  /**
083   * For cleaning up the job's output after job completion.  This is called
084   * from the application master process for the entire job. This may be called
085   * multiple times.
086   * 
087   * @param jobContext Context of the job whose output is being written.
088   * @throws IOException
089   * @deprecated Use {@link #commitJob(JobContext)} or 
090   *                 {@link #abortJob(JobContext, int)} instead.
091   */
092  @Deprecated
093  public void cleanupJob(JobContext jobContext) throws IOException { }
094
095  /**
096   * For committing job's output after successful job completion. Note that this
097   * is invoked for jobs with final runstate as SUCCESSFUL.  This is called
098   * from the application master process for the entire job. This is guaranteed
099   * to only be called once.  If it throws an exception the entire job will
100   * fail.
101   * 
102   * @param jobContext Context of the job whose output is being written.
103   * @throws IOException 
104   */
105  public void commitJob(JobContext jobContext) throws IOException {
106    cleanupJob(jobContext);
107  }
108  
109  /**
110   * For aborting an unsuccessful job's output. Note that this is invoked for 
111   * jobs with final runstate as {@link JobStatus#FAILED} or 
112   * {@link JobStatus#KILLED}. This is called from the application
113   * master process for the entire job. This may be called multiple times.
114   * 
115   * @param jobContext Context of the job whose output is being written.
116   * @param status final runstate of the job
117   * @throws IOException
118   */
119  public void abortJob(JobContext jobContext, int status) 
120  throws IOException {
121    cleanupJob(jobContext);
122  }
123  
124  /**
125   * Sets up output for the task. This is called from each individual task's
126   * process that will output to HDFS, and it is called just for that task. This
127   * may be called multiple times for the same task, but for different task
128   * attempts.
129   * 
130   * @param taskContext Context of the task whose output is being written.
131   * @throws IOException
132   */
133  public abstract void setupTask(TaskAttemptContext taskContext)
134  throws IOException;
135  
136  /**
137   * Check whether task needs a commit.  This is called from each individual
138   * task's process that will output to HDFS, and it is called just for that
139   * task.
140   * 
141   * @param taskContext
142   * @return true/false
143   * @throws IOException
144   */
145  public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
146  throws IOException;
147
148  /**
149   * To promote the task's temporary output to final output location.
150   * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this
151   * task is the task that the AM determines finished first, this method
152   * is called to commit an individual task's output.  This is to mark
153   * that tasks output as complete, as {@link #commitJob(JobContext)} will 
154   * also be called later on if the entire job finished successfully. This
155   * is called from a task's process. This may be called multiple times for the
156   * same task, but different task attempts.  It should be very rare for this to
157   * be called multiple times and requires odd networking failures to make this
158   * happen. In the future the Hadoop framework may eliminate this race.
159   * 
160   * @param taskContext Context of the task whose output is being written.
161   * @throws IOException if commit is not 
162   */
163  public abstract void commitTask(TaskAttemptContext taskContext)
164  throws IOException;
165  
166  /**
167   * Discard the task output. This is called from a task's process to clean 
168   * up a single task's output that can not yet been committed. This may be
169   * called multiple times for the same task, but for different task attempts.
170   * 
171   * @param taskContext
172   * @throws IOException
173   */
174  public abstract void abortTask(TaskAttemptContext taskContext)
175  throws IOException;
176
177  /**
178   * This method implements the new interface by calling the old method. Note
179   * that the input types are different between the new and old apis and this
180   * is a bridge between the two.
181   */
182  @Override
183  public boolean isRecoverySupported() {
184    return false;
185  }
186
187  /**
188   * Recover the task output. 
189   * 
190   * The retry-count for the job will be passed via the 
191   * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in  
192   * {@link TaskAttemptContext#getConfiguration()} for the 
193   * <code>OutputCommitter</code>. This is called from the application master
194   * process, but it is called individually for each task.
195   * 
196   * If an exception is thrown the task will be attempted again. 
197   * 
198   * @param taskContext Context of the task whose output is being recovered
199   * @throws IOException
200   */
201  public void recoverTask(TaskAttemptContext taskContext) 
202  throws IOException {
203  }
204  
205  /**
206   * This method implements the new interface by calling the old method. Note
207   * that the input types are different between the new and old apis and this
208   * is a bridge between the two.
209   */
210  @Override
211  public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext
212                             ) throws IOException {
213    setupJob((JobContext) jobContext);
214  }
215
216  /**
217   * This method implements the new interface by calling the old method. Note
218   * that the input types are different between the new and old apis and this
219   * is a bridge between the two.
220   * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)}
221   *             or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
222   *             instead.
223   */
224  @Override
225  @Deprecated
226  public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
227                               ) throws IOException {
228    cleanupJob((JobContext) context);
229  }
230
231  /**
232   * This method implements the new interface by calling the old method. Note
233   * that the input types are different between the new and old apis and this
234   * is a bridge between the two.
235   */
236  @Override
237  public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
238                             ) throws IOException {
239    commitJob((JobContext) context);
240  }
241  
242  /**
243   * This method implements the new interface by calling the old method. Note
244   * that the input types are different between the new and old apis and this
245   * is a bridge between the two.
246   */
247  @Override
248  public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 
249                                   org.apache.hadoop.mapreduce.JobStatus.State runState) 
250  throws IOException {
251    int state = JobStatus.getOldNewJobRunState(runState);
252    if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
253      throw new IOException ("Invalid job run state : " + runState.name());
254    }
255    abortJob((JobContext) context, state);
256  }
257  
258  /**
259   * This method implements the new interface by calling the old method. Note
260   * that the input types are different between the new and old apis and this
261   * is a bridge between the two.
262   */
263  @Override
264  public final 
265  void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
266                 ) throws IOException {
267    setupTask((TaskAttemptContext) taskContext);
268  }
269  
270  /**
271   * This method implements the new interface by calling the old method. Note
272   * that the input types are different between the new and old apis and this
273   * is a bridge between the two.
274   */
275  @Override
276  public final boolean 
277    needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
278                    ) throws IOException {
279    return needsTaskCommit((TaskAttemptContext) taskContext);
280  }
281
282  /**
283   * This method implements the new interface by calling the old method. Note
284   * that the input types are different between the new and old apis and this
285   * is a bridge between the two.
286   */
287  @Override
288  public final 
289  void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
290                  ) throws IOException {
291    commitTask((TaskAttemptContext) taskContext);
292  }
293  
294  /**
295   * This method implements the new interface by calling the old method. Note
296   * that the input types are different between the new and old apis and this
297   * is a bridge between the two.
298   */
299  @Override
300  public final 
301  void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
302                 ) throws IOException {
303    abortTask((TaskAttemptContext) taskContext);
304  }
305  
306  /**
307   * This method implements the new interface by calling the old method. Note
308   * that the input types are different between the new and old apis and this
309   * is a bridge between the two.
310   */
311  @Override
312  public final 
313  void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
314      ) throws IOException {
315    recoverTask((TaskAttemptContext) taskContext);
316  }
317
318}