001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    
026    /**
027     * <code>OutputCommitter</code> describes the commit of task output for a 
028     * Map-Reduce job.
029     *
030     * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of 
031     * the job to:<p>
032     * <ol>
033     *   <li>
034     *   Setup the job during initialization. For example, create the temporary 
035     *   output directory for the job during the initialization of the job.
036     *   </li>
037     *   <li>
038     *   Cleanup the job after the job completion. For example, remove the
039     *   temporary output directory after the job completion. 
040     *   </li>
041     *   <li>
042     *   Setup the task temporary output.
043     *   </li> 
044     *   <li>
045     *   Check whether a task needs a commit. This is to avoid the commit
046     *   procedure if a task does not need commit.
047     *   </li>
048     *   <li>
049     *   Commit of the task output.
050     *   </li>  
051     *   <li>
052     *   Discard the task commit.
053     *   </li>
054     * </ol>
055     * The methods in this class can be called from several different processes and
056     * from several different contexts.  It is important to know which process and
057     * which context each is called from.  Each method should be marked accordingly
058     * in its documentation.  It is also important to note that not all methods are
059     * guaranteed to be called once and only once.  If a method is not guaranteed to
060     * have this property the output committer needs to handle this appropriately. 
061     * Also note it will only be in rare situations where they may be called 
062     * multiple times for the same task.
063     * 
064     * @see FileOutputCommitter 
065     * @see JobContext
066     * @see TaskAttemptContext 
067     */
068    @InterfaceAudience.Public
069    @InterfaceStability.Stable
070    public abstract class OutputCommitter 
071                    extends org.apache.hadoop.mapreduce.OutputCommitter {
072      /**
073       * For the framework to setup the job output during initialization.  This is
074       * called from the application master process for the entire job. This will be
075       * called multiple times, once per job attempt.
076       * 
077       * @param jobContext Context of the job whose output is being written.
078       * @throws IOException if temporary output could not be created
079       */
080      public abstract void setupJob(JobContext jobContext) throws IOException;
081    
082      /**
083       * For cleaning up the job's output after job completion.  This is called
084       * from the application master process for the entire job. This may be called
085       * multiple times.
086       * 
087       * @param jobContext Context of the job whose output is being written.
088       * @throws IOException
089       * @deprecated Use {@link #commitJob(JobContext)} or 
090       *                 {@link #abortJob(JobContext, int)} instead.
091       */
092      @Deprecated
093      public void cleanupJob(JobContext jobContext) throws IOException { }
094    
095      /**
096       * For committing job's output after successful job completion. Note that this
097       * is invoked for jobs with final runstate as SUCCESSFUL.  This is called
098       * from the application master process for the entire job. This is guaranteed
099       * to only be called once.  If it throws an exception the entire job will
100       * fail.
101       * 
102       * @param jobContext Context of the job whose output is being written.
103       * @throws IOException 
104       */
105      public void commitJob(JobContext jobContext) throws IOException {
106        cleanupJob(jobContext);
107      }
108      
109      /**
110       * For aborting an unsuccessful job's output. Note that this is invoked for 
111       * jobs with final runstate as {@link JobStatus#FAILED} or 
112       * {@link JobStatus#KILLED}. This is called from the application
113       * master process for the entire job. This may be called multiple times.
114       * 
115       * @param jobContext Context of the job whose output is being written.
116       * @param status final runstate of the job
117       * @throws IOException
118       */
119      public void abortJob(JobContext jobContext, int status) 
120      throws IOException {
121        cleanupJob(jobContext);
122      }
123      
124      /**
125       * Sets up output for the task. This is called from each individual task's
126       * process that will output to HDFS, and it is called just for that task. This
127       * may be called multiple times for the same task, but for different task
128       * attempts.
129       * 
130       * @param taskContext Context of the task whose output is being written.
131       * @throws IOException
132       */
133      public abstract void setupTask(TaskAttemptContext taskContext)
134      throws IOException;
135      
136      /**
137       * Check whether task needs a commit.  This is called from each individual
138       * task's process that will output to HDFS, and it is called just for that
139       * task.
140       * 
141       * @param taskContext
142       * @return true/false
143       * @throws IOException
144       */
145      public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
146      throws IOException;
147    
148      /**
149       * To promote the task's temporary output to final output location.
150       * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this
151       * task is the task that the AM determines finished first, this method
152       * is called to commit an individual task's output.  This is to mark
153       * that tasks output as complete, as {@link #commitJob(JobContext)} will 
154       * also be called later on if the entire job finished successfully. This
155       * is called from a task's process. This may be called multiple times for the
156       * same task, but different task attempts.  It should be very rare for this to
157       * be called multiple times and requires odd networking failures to make this
158       * happen. In the future the Hadoop framework may eliminate this race.
159       * 
160       * @param taskContext Context of the task whose output is being written.
161       * @throws IOException if commit is not 
162       */
163      public abstract void commitTask(TaskAttemptContext taskContext)
164      throws IOException;
165      
166      /**
167       * Discard the task output. This is called from a task's process to clean 
168       * up a single task's output that can not yet been committed. This may be
169       * called multiple times for the same task, but for different task attempts.
170       * 
171       * @param taskContext
172       * @throws IOException
173       */
174      public abstract void abortTask(TaskAttemptContext taskContext)
175      throws IOException;
176    
177      /**
178       * This method implements the new interface by calling the old method. Note
179       * that the input types are different between the new and old apis and this is
180       * a bridge between the two.
181       * 
182       * @deprecated Use {@link #isRecoverySupported(JobContext)} instead.
183       */
184      @Deprecated
185      @Override
186      public boolean isRecoverySupported() {
187        return false;
188      }
189    
190      /**
191       * Is task output recovery supported for restarting jobs?
192       * 
193       * If task output recovery is supported, job restart can be done more
194       * efficiently.
195       * 
196       * @param jobContext
197       *          Context of the job whose output is being written.
198       * @return <code>true</code> if task output recovery is supported,
199       *         <code>false</code> otherwise
200       * @throws IOException
201       * @see #recoverTask(TaskAttemptContext)
202       */
203      public boolean isRecoverySupported(JobContext jobContext) throws IOException {
204        return isRecoverySupported();
205      }
206    
207      /**
208       * Recover the task output. 
209       * 
210       * The retry-count for the job will be passed via the 
211       * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in  
212       * {@link TaskAttemptContext#getConfiguration()} for the 
213       * <code>OutputCommitter</code>. This is called from the application master
214       * process, but it is called individually for each task.
215       * 
216       * If an exception is thrown the task will be attempted again. 
217       * 
218       * @param taskContext Context of the task whose output is being recovered
219       * @throws IOException
220       */
221      public void recoverTask(TaskAttemptContext taskContext) 
222      throws IOException {
223      }
224      
225      /**
226       * This method implements the new interface by calling the old method. Note
227       * that the input types are different between the new and old apis and this
228       * is a bridge between the two.
229       */
230      @Override
231      public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext
232                                 ) throws IOException {
233        setupJob((JobContext) jobContext);
234      }
235    
236      /**
237       * This method implements the new interface by calling the old method. Note
238       * that the input types are different between the new and old apis and this
239       * is a bridge between the two.
240       * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)}
241       *             or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
242       *             instead.
243       */
244      @Override
245      @Deprecated
246      public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
247                                   ) throws IOException {
248        cleanupJob((JobContext) context);
249      }
250    
251      /**
252       * This method implements the new interface by calling the old method. Note
253       * that the input types are different between the new and old apis and this
254       * is a bridge between the two.
255       */
256      @Override
257      public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
258                                 ) throws IOException {
259        commitJob((JobContext) context);
260      }
261      
262      /**
263       * This method implements the new interface by calling the old method. Note
264       * that the input types are different between the new and old apis and this
265       * is a bridge between the two.
266       */
267      @Override
268      public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 
269                                       org.apache.hadoop.mapreduce.JobStatus.State runState) 
270      throws IOException {
271        int state = JobStatus.getOldNewJobRunState(runState);
272        if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
273          throw new IOException ("Invalid job run state : " + runState.name());
274        }
275        abortJob((JobContext) context, state);
276      }
277      
278      /**
279       * This method implements the new interface by calling the old method. Note
280       * that the input types are different between the new and old apis and this
281       * is a bridge between the two.
282       */
283      @Override
284      public final 
285      void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
286                     ) throws IOException {
287        setupTask((TaskAttemptContext) taskContext);
288      }
289      
290      /**
291       * This method implements the new interface by calling the old method. Note
292       * that the input types are different between the new and old apis and this
293       * is a bridge between the two.
294       */
295      @Override
296      public final boolean 
297        needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
298                        ) throws IOException {
299        return needsTaskCommit((TaskAttemptContext) taskContext);
300      }
301    
302      /**
303       * This method implements the new interface by calling the old method. Note
304       * that the input types are different between the new and old apis and this
305       * is a bridge between the two.
306       */
307      @Override
308      public final 
309      void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
310                      ) throws IOException {
311        commitTask((TaskAttemptContext) taskContext);
312      }
313      
314      /**
315       * This method implements the new interface by calling the old method. Note
316       * that the input types are different between the new and old apis and this
317       * is a bridge between the two.
318       */
319      @Override
320      public final 
321      void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
322                     ) throws IOException {
323        abortTask((TaskAttemptContext) taskContext);
324      }
325      
326      /**
327       * This method implements the new interface by calling the old method. Note
328       * that the input types are different between the new and old apis and this
329       * is a bridge between the two.
330       */
331      @Override
332      public final 
333      void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
334          ) throws IOException {
335        recoverTask((TaskAttemptContext) taskContext);
336      }
337    
338      /**
339       * This method implements the new interface by calling the old method. Note
340       * that the input types are different between the new and old apis and this is
341       * a bridge between the two.
342       */
343      @Override
344      public final boolean isRecoverySupported(
345          org.apache.hadoop.mapreduce.JobContext context) throws IOException {
346        return isRecoverySupported((JobContext) context);
347      }
348    
349    }