001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025
026 /**
027 * <code>OutputCommitter</code> describes the commit of task output for a
028 * Map-Reduce job.
029 *
030 * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of
031 * the job to:<p>
032 * <ol>
033 * <li>
034 * Setup the job during initialization. For example, create the temporary
035 * output directory for the job during the initialization of the job.
036 * </li>
037 * <li>
038 * Cleanup the job after the job completion. For example, remove the
039 * temporary output directory after the job completion.
040 * </li>
041 * <li>
042 * Setup the task temporary output.
043 * </li>
044 * <li>
045 * Check whether a task needs a commit. This is to avoid the commit
046 * procedure if a task does not need commit.
047 * </li>
048 * <li>
049 * Commit of the task output.
050 * </li>
051 * <li>
052 * Discard the task commit.
053 * </li>
054 * </ol>
055 * The methods in this class can be called from several different processes and
056 * from several different contexts. It is important to know which process and
057 * which context each is called from. Each method should be marked accordingly
058 * in its documentation. It is also important to note that not all methods are
059 * guaranteed to be called once and only once. If a method is not guaranteed to
060 * have this property the output committer needs to handle this appropriately.
061 * Also note it will only be in rare situations where they may be called
062 * multiple times for the same task.
063 *
064 * @see FileOutputCommitter
065 * @see JobContext
066 * @see TaskAttemptContext
067 */
068 @InterfaceAudience.Public
069 @InterfaceStability.Stable
070 public abstract class OutputCommitter
071 extends org.apache.hadoop.mapreduce.OutputCommitter {
072 /**
073 * For the framework to setup the job output during initialization. This is
074 * called from the application master process for the entire job. This will be
075 * called multiple times, once per job attempt.
076 *
077 * @param jobContext Context of the job whose output is being written.
078 * @throws IOException if temporary output could not be created
079 */
080 public abstract void setupJob(JobContext jobContext) throws IOException;
081
082 /**
083 * For cleaning up the job's output after job completion. This is called
084 * from the application master process for the entire job. This may be called
085 * multiple times.
086 *
087 * @param jobContext Context of the job whose output is being written.
088 * @throws IOException
089 * @deprecated Use {@link #commitJob(JobContext)} or
090 * {@link #abortJob(JobContext, int)} instead.
091 */
092 @Deprecated
093 public void cleanupJob(JobContext jobContext) throws IOException { }
094
095 /**
096 * For committing job's output after successful job completion. Note that this
097 * is invoked for jobs with final runstate as SUCCESSFUL. This is called
098 * from the application master process for the entire job. This is guaranteed
099 * to only be called once. If it throws an exception the entire job will
100 * fail.
101 *
102 * @param jobContext Context of the job whose output is being written.
103 * @throws IOException
104 */
105 public void commitJob(JobContext jobContext) throws IOException {
106 cleanupJob(jobContext);
107 }
108
109 /**
110 * For aborting an unsuccessful job's output. Note that this is invoked for
111 * jobs with final runstate as {@link JobStatus#FAILED} or
112 * {@link JobStatus#KILLED}. This is called from the application
113 * master process for the entire job. This may be called multiple times.
114 *
115 * @param jobContext Context of the job whose output is being written.
116 * @param status final runstate of the job
117 * @throws IOException
118 */
119 public void abortJob(JobContext jobContext, int status)
120 throws IOException {
121 cleanupJob(jobContext);
122 }
123
124 /**
125 * Sets up output for the task. This is called from each individual task's
126 * process that will output to HDFS, and it is called just for that task. This
127 * may be called multiple times for the same task, but for different task
128 * attempts.
129 *
130 * @param taskContext Context of the task whose output is being written.
131 * @throws IOException
132 */
133 public abstract void setupTask(TaskAttemptContext taskContext)
134 throws IOException;
135
136 /**
137 * Check whether task needs a commit. This is called from each individual
138 * task's process that will output to HDFS, and it is called just for that
139 * task.
140 *
141 * @param taskContext
142 * @return true/false
143 * @throws IOException
144 */
145 public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
146 throws IOException;
147
148 /**
149 * To promote the task's temporary output to final output location.
150 * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this
151 * task is the task that the AM determines finished first, this method
152 * is called to commit an individual task's output. This is to mark
153 * that tasks output as complete, as {@link #commitJob(JobContext)} will
154 * also be called later on if the entire job finished successfully. This
155 * is called from a task's process. This may be called multiple times for the
156 * same task, but different task attempts. It should be very rare for this to
157 * be called multiple times and requires odd networking failures to make this
158 * happen. In the future the Hadoop framework may eliminate this race.
159 *
160 * @param taskContext Context of the task whose output is being written.
161 * @throws IOException if commit is not
162 */
163 public abstract void commitTask(TaskAttemptContext taskContext)
164 throws IOException;
165
166 /**
167 * Discard the task output. This is called from a task's process to clean
168 * up a single task's output that can not yet been committed. This may be
169 * called multiple times for the same task, but for different task attempts.
170 *
171 * @param taskContext
172 * @throws IOException
173 */
174 public abstract void abortTask(TaskAttemptContext taskContext)
175 throws IOException;
176
177 /**
178 * This method implements the new interface by calling the old method. Note
179 * that the input types are different between the new and old apis and this is
180 * a bridge between the two.
181 *
182 * @deprecated Use {@link #isRecoverySupported(JobContext)} instead.
183 */
184 @Deprecated
185 @Override
186 public boolean isRecoverySupported() {
187 return false;
188 }
189
190 /**
191 * Is task output recovery supported for restarting jobs?
192 *
193 * If task output recovery is supported, job restart can be done more
194 * efficiently.
195 *
196 * @param jobContext
197 * Context of the job whose output is being written.
198 * @return <code>true</code> if task output recovery is supported,
199 * <code>false</code> otherwise
200 * @throws IOException
201 * @see #recoverTask(TaskAttemptContext)
202 */
203 public boolean isRecoverySupported(JobContext jobContext) throws IOException {
204 return isRecoverySupported();
205 }
206
207 /**
208 * Recover the task output.
209 *
210 * The retry-count for the job will be passed via the
211 * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in
212 * {@link TaskAttemptContext#getConfiguration()} for the
213 * <code>OutputCommitter</code>. This is called from the application master
214 * process, but it is called individually for each task.
215 *
216 * If an exception is thrown the task will be attempted again.
217 *
218 * @param taskContext Context of the task whose output is being recovered
219 * @throws IOException
220 */
221 public void recoverTask(TaskAttemptContext taskContext)
222 throws IOException {
223 }
224
225 /**
226 * This method implements the new interface by calling the old method. Note
227 * that the input types are different between the new and old apis and this
228 * is a bridge between the two.
229 */
230 @Override
231 public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext
232 ) throws IOException {
233 setupJob((JobContext) jobContext);
234 }
235
236 /**
237 * This method implements the new interface by calling the old method. Note
238 * that the input types are different between the new and old apis and this
239 * is a bridge between the two.
240 * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)}
241 * or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
242 * instead.
243 */
244 @Override
245 @Deprecated
246 public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
247 ) throws IOException {
248 cleanupJob((JobContext) context);
249 }
250
251 /**
252 * This method implements the new interface by calling the old method. Note
253 * that the input types are different between the new and old apis and this
254 * is a bridge between the two.
255 */
256 @Override
257 public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
258 ) throws IOException {
259 commitJob((JobContext) context);
260 }
261
262 /**
263 * This method implements the new interface by calling the old method. Note
264 * that the input types are different between the new and old apis and this
265 * is a bridge between the two.
266 */
267 @Override
268 public final void abortJob(org.apache.hadoop.mapreduce.JobContext context,
269 org.apache.hadoop.mapreduce.JobStatus.State runState)
270 throws IOException {
271 int state = JobStatus.getOldNewJobRunState(runState);
272 if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
273 throw new IOException ("Invalid job run state : " + runState.name());
274 }
275 abortJob((JobContext) context, state);
276 }
277
278 /**
279 * This method implements the new interface by calling the old method. Note
280 * that the input types are different between the new and old apis and this
281 * is a bridge between the two.
282 */
283 @Override
284 public final
285 void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
286 ) throws IOException {
287 setupTask((TaskAttemptContext) taskContext);
288 }
289
290 /**
291 * This method implements the new interface by calling the old method. Note
292 * that the input types are different between the new and old apis and this
293 * is a bridge between the two.
294 */
295 @Override
296 public final boolean
297 needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
298 ) throws IOException {
299 return needsTaskCommit((TaskAttemptContext) taskContext);
300 }
301
302 /**
303 * This method implements the new interface by calling the old method. Note
304 * that the input types are different between the new and old apis and this
305 * is a bridge between the two.
306 */
307 @Override
308 public final
309 void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
310 ) throws IOException {
311 commitTask((TaskAttemptContext) taskContext);
312 }
313
314 /**
315 * This method implements the new interface by calling the old method. Note
316 * that the input types are different between the new and old apis and this
317 * is a bridge between the two.
318 */
319 @Override
320 public final
321 void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
322 ) throws IOException {
323 abortTask((TaskAttemptContext) taskContext);
324 }
325
326 /**
327 * This method implements the new interface by calling the old method. Note
328 * that the input types are different between the new and old apis and this
329 * is a bridge between the two.
330 */
331 @Override
332 public final
333 void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
334 ) throws IOException {
335 recoverTask((TaskAttemptContext) taskContext);
336 }
337
338 /**
339 * This method implements the new interface by calling the old method. Note
340 * that the input types are different between the new and old apis and this is
341 * a bridge between the two.
342 */
343 @Override
344 public final boolean isRecoverySupported(
345 org.apache.hadoop.mapreduce.JobContext context) throws IOException {
346 return isRecoverySupported((JobContext) context);
347 }
348
349 }