001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    
026    /**
027     * <code>OutputCommitter</code> describes the commit of task output for a 
028     * Map-Reduce job.
029     *
030     * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of 
031     * the job to:<p>
032     * <ol>
033     *   <li>
034     *   Setup the job during initialization. For example, create the temporary 
035     *   output directory for the job during the initialization of the job.
036     *   </li>
037     *   <li>
038     *   Cleanup the job after the job completion. For example, remove the
039     *   temporary output directory after the job completion. 
040     *   </li>
041     *   <li>
042     *   Setup the task temporary output.
043     *   </li> 
044     *   <li>
045     *   Check whether a task needs a commit. This is to avoid the commit
046     *   procedure if a task does not need commit.
047     *   </li>
048     *   <li>
049     *   Commit of the task output.
050     *   </li>  
051     *   <li>
052     *   Discard the task commit.
053     *   </li>
054     * </ol>
055     * The methods in this class can be called from several different processes and
056     * from several different contexts.  It is important to know which process and
057     * which context each is called from.  Each method should be marked accordingly
058     * in its documentation.  It is also important to note that not all methods are
059     * guaranteed to be called once and only once.  If a method is not guaranteed to
060     * have this property the output committer needs to handle this appropriately. 
061     * Also note it will only be in rare situations where they may be called 
062     * multiple times for the same task.
063     * 
064     * @see FileOutputCommitter 
065     * @see JobContext
066     * @see TaskAttemptContext 
067     */
068    @InterfaceAudience.Public
069    @InterfaceStability.Stable
070    public abstract class OutputCommitter 
071                    extends org.apache.hadoop.mapreduce.OutputCommitter {
072      /**
073       * For the framework to setup the job output during initialization.  This is
074       * called from the application master process for the entire job. This will be
075       * called multiple times, once per job attempt.
076       * 
077       * @param jobContext Context of the job whose output is being written.
078       * @throws IOException if temporary output could not be created
079       */
080      public abstract void setupJob(JobContext jobContext) throws IOException;
081    
082      /**
083       * For cleaning up the job's output after job completion.  This is called
084       * from the application master process for the entire job. This may be called
085       * multiple times.
086       * 
087       * @param jobContext Context of the job whose output is being written.
088       * @throws IOException
089       * @deprecated Use {@link #commitJob(JobContext)} or 
090       *                 {@link #abortJob(JobContext, int)} instead.
091       */
092      @Deprecated
093      public void cleanupJob(JobContext jobContext) throws IOException { }
094    
095      /**
096       * For committing job's output after successful job completion. Note that this
097       * is invoked for jobs with final runstate as SUCCESSFUL.  This is called
098       * from the application master process for the entire job. This is guaranteed
099       * to only be called once.  If it throws an exception the entire job will
100       * fail.
101       * 
102       * @param jobContext Context of the job whose output is being written.
103       * @throws IOException 
104       */
105      public void commitJob(JobContext jobContext) throws IOException {
106        cleanupJob(jobContext);
107      }
108      
109      /**
110       * For aborting an unsuccessful job's output. Note that this is invoked for 
111       * jobs with final runstate as {@link JobStatus#FAILED} or 
112       * {@link JobStatus#KILLED}. This is called from the application
113       * master process for the entire job. This may be called multiple times.
114       * 
115       * @param jobContext Context of the job whose output is being written.
116       * @param status final runstate of the job
117       * @throws IOException
118       */
119      public void abortJob(JobContext jobContext, int status) 
120      throws IOException {
121        cleanupJob(jobContext);
122      }
123      
124      /**
125       * Sets up output for the task. This is called from each individual task's
126       * process that will output to HDFS, and it is called just for that task. This
127       * may be called multiple times for the same task, but for different task
128       * attempts.
129       * 
130       * @param taskContext Context of the task whose output is being written.
131       * @throws IOException
132       */
133      public abstract void setupTask(TaskAttemptContext taskContext)
134      throws IOException;
135      
136      /**
137       * Check whether task needs a commit.  This is called from each individual
138       * task's process that will output to HDFS, and it is called just for that
139       * task.
140       * 
141       * @param taskContext
142       * @return true/false
143       * @throws IOException
144       */
145      public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
146      throws IOException;
147    
148      /**
149       * To promote the task's temporary output to final output location.
150       * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this
151       * task is the task that the AM determines finished first, this method
152       * is called to commit an individual task's output.  This is to mark
153       * that tasks output as complete, as {@link #commitJob(JobContext)} will 
154       * also be called later on if the entire job finished successfully. This
155       * is called from a task's process. This may be called multiple times for the
156       * same task, but different task attempts.  It should be very rare for this to
157       * be called multiple times and requires odd networking failures to make this
158       * happen. In the future the Hadoop framework may eliminate this race.
159       * 
160       * @param taskContext Context of the task whose output is being written.
161       * @throws IOException if commit is not 
162       */
163      public abstract void commitTask(TaskAttemptContext taskContext)
164      throws IOException;
165      
166      /**
167       * Discard the task output. This is called from a task's process to clean 
168       * up a single task's output that can not yet been committed. This may be
169       * called multiple times for the same task, but for different task attempts.
170       * 
171       * @param taskContext
172       * @throws IOException
173       */
174      public abstract void abortTask(TaskAttemptContext taskContext)
175      throws IOException;
176    
177      /**
178       * This method implements the new interface by calling the old method. Note
179       * that the input types are different between the new and old apis and this
180       * is a bridge between the two.
181       */
182      @Override
183      public boolean isRecoverySupported() {
184        return false;
185      }
186    
187      /**
188       * Recover the task output. 
189       * 
190       * The retry-count for the job will be passed via the 
191       * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in  
192       * {@link TaskAttemptContext#getConfiguration()} for the 
193       * <code>OutputCommitter</code>. This is called from the application master
194       * process, but it is called individually for each task.
195       * 
196       * If an exception is thrown the task will be attempted again. 
197       * 
198       * @param taskContext Context of the task whose output is being recovered
199       * @throws IOException
200       */
201      public void recoverTask(TaskAttemptContext taskContext) 
202      throws IOException {
203      }
204      
205      /**
206       * This method implements the new interface by calling the old method. Note
207       * that the input types are different between the new and old apis and this
208       * is a bridge between the two.
209       */
210      @Override
211      public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext
212                                 ) throws IOException {
213        setupJob((JobContext) jobContext);
214      }
215    
216      /**
217       * This method implements the new interface by calling the old method. Note
218       * that the input types are different between the new and old apis and this
219       * is a bridge between the two.
220       * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)}
221       *             or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
222       *             instead.
223       */
224      @Override
225      @Deprecated
226      public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
227                                   ) throws IOException {
228        cleanupJob((JobContext) context);
229      }
230    
231      /**
232       * This method implements the new interface by calling the old method. Note
233       * that the input types are different between the new and old apis and this
234       * is a bridge between the two.
235       */
236      @Override
237      public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
238                                 ) throws IOException {
239        commitJob((JobContext) context);
240      }
241      
242      /**
243       * This method implements the new interface by calling the old method. Note
244       * that the input types are different between the new and old apis and this
245       * is a bridge between the two.
246       */
247      @Override
248      public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 
249                                       org.apache.hadoop.mapreduce.JobStatus.State runState) 
250      throws IOException {
251        int state = JobStatus.getOldNewJobRunState(runState);
252        if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
253          throw new IOException ("Invalid job run state : " + runState.name());
254        }
255        abortJob((JobContext) context, state);
256      }
257      
258      /**
259       * This method implements the new interface by calling the old method. Note
260       * that the input types are different between the new and old apis and this
261       * is a bridge between the two.
262       */
263      @Override
264      public final 
265      void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
266                     ) throws IOException {
267        setupTask((TaskAttemptContext) taskContext);
268      }
269      
270      /**
271       * This method implements the new interface by calling the old method. Note
272       * that the input types are different between the new and old apis and this
273       * is a bridge between the two.
274       */
275      @Override
276      public final boolean 
277        needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
278                        ) throws IOException {
279        return needsTaskCommit((TaskAttemptContext) taskContext);
280      }
281    
282      /**
283       * This method implements the new interface by calling the old method. Note
284       * that the input types are different between the new and old apis and this
285       * is a bridge between the two.
286       */
287      @Override
288      public final 
289      void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
290                      ) throws IOException {
291        commitTask((TaskAttemptContext) taskContext);
292      }
293      
294      /**
295       * This method implements the new interface by calling the old method. Note
296       * that the input types are different between the new and old apis and this
297       * is a bridge between the two.
298       */
299      @Override
300      public final 
301      void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
302                     ) throws IOException {
303        abortTask((TaskAttemptContext) taskContext);
304      }
305      
306      /**
307       * This method implements the new interface by calling the old method. Note
308       * that the input types are different between the new and old apis and this
309       * is a bridge between the two.
310       */
311      @Override
312      public final 
313      void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
314          ) throws IOException {
315        recoverTask((TaskAttemptContext) taskContext);
316      }
317    
318    }