001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025
026 /**
027 * <code>OutputCommitter</code> describes the commit of task output for a
028 * Map-Reduce job.
029 *
030 * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of
031 * the job to:<p>
032 * <ol>
033 * <li>
034 * Setup the job during initialization. For example, create the temporary
035 * output directory for the job during the initialization of the job.
036 * </li>
037 * <li>
038 * Cleanup the job after the job completion. For example, remove the
039 * temporary output directory after the job completion.
040 * </li>
041 * <li>
042 * Setup the task temporary output.
043 * </li>
044 * <li>
045 * Check whether a task needs a commit. This is to avoid the commit
046 * procedure if a task does not need commit.
047 * </li>
048 * <li>
049 * Commit of the task output.
050 * </li>
051 * <li>
052 * Discard the task commit.
053 * </li>
054 * </ol>
055 * The methods in this class can be called from several different processes and
056 * from several different contexts. It is important to know which process and
057 * which context each is called from. Each method should be marked accordingly
058 * in its documentation. It is also important to note that not all methods are
059 * guaranteed to be called once and only once. If a method is not guaranteed to
060 * have this property the output committer needs to handle this appropriately.
061 * Also note it will only be in rare situations where they may be called
062 * multiple times for the same task.
063 *
064 * @see FileOutputCommitter
065 * @see JobContext
066 * @see TaskAttemptContext
067 */
068 @InterfaceAudience.Public
069 @InterfaceStability.Stable
070 public abstract class OutputCommitter
071 extends org.apache.hadoop.mapreduce.OutputCommitter {
072 /**
073 * For the framework to setup the job output during initialization. This is
074 * called from the application master process for the entire job. This will be
075 * called multiple times, once per job attempt.
076 *
077 * @param jobContext Context of the job whose output is being written.
078 * @throws IOException if temporary output could not be created
079 */
080 public abstract void setupJob(JobContext jobContext) throws IOException;
081
082 /**
083 * For cleaning up the job's output after job completion. This is called
084 * from the application master process for the entire job. This may be called
085 * multiple times.
086 *
087 * @param jobContext Context of the job whose output is being written.
088 * @throws IOException
089 * @deprecated Use {@link #commitJob(JobContext)} or
090 * {@link #abortJob(JobContext, int)} instead.
091 */
092 @Deprecated
093 public void cleanupJob(JobContext jobContext) throws IOException { }
094
095 /**
096 * For committing job's output after successful job completion. Note that this
097 * is invoked for jobs with final runstate as SUCCESSFUL. This is called
098 * from the application master process for the entire job. This is guaranteed
099 * to only be called once. If it throws an exception the entire job will
100 * fail.
101 *
102 * @param jobContext Context of the job whose output is being written.
103 * @throws IOException
104 */
105 public void commitJob(JobContext jobContext) throws IOException {
106 cleanupJob(jobContext);
107 }
108
109 /**
110 * For aborting an unsuccessful job's output. Note that this is invoked for
111 * jobs with final runstate as {@link JobStatus#FAILED} or
112 * {@link JobStatus#KILLED}. This is called from the application
113 * master process for the entire job. This may be called multiple times.
114 *
115 * @param jobContext Context of the job whose output is being written.
116 * @param status final runstate of the job
117 * @throws IOException
118 */
119 public void abortJob(JobContext jobContext, int status)
120 throws IOException {
121 cleanupJob(jobContext);
122 }
123
124 /**
125 * Sets up output for the task. This is called from each individual task's
126 * process that will output to HDFS, and it is called just for that task. This
127 * may be called multiple times for the same task, but for different task
128 * attempts.
129 *
130 * @param taskContext Context of the task whose output is being written.
131 * @throws IOException
132 */
133 public abstract void setupTask(TaskAttemptContext taskContext)
134 throws IOException;
135
136 /**
137 * Check whether task needs a commit. This is called from each individual
138 * task's process that will output to HDFS, and it is called just for that
139 * task.
140 *
141 * @param taskContext
142 * @return true/false
143 * @throws IOException
144 */
145 public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
146 throws IOException;
147
148 /**
149 * To promote the task's temporary output to final output location.
150 * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this
151 * task is the task that the AM determines finished first, this method
152 * is called to commit an individual task's output. This is to mark
153 * that tasks output as complete, as {@link #commitJob(JobContext)} will
154 * also be called later on if the entire job finished successfully. This
155 * is called from a task's process. This may be called multiple times for the
156 * same task, but different task attempts. It should be very rare for this to
157 * be called multiple times and requires odd networking failures to make this
158 * happen. In the future the Hadoop framework may eliminate this race.
159 *
160 * @param taskContext Context of the task whose output is being written.
161 * @throws IOException if commit is not
162 */
163 public abstract void commitTask(TaskAttemptContext taskContext)
164 throws IOException;
165
166 /**
167 * Discard the task output. This is called from a task's process to clean
168 * up a single task's output that can not yet been committed. This may be
169 * called multiple times for the same task, but for different task attempts.
170 *
171 * @param taskContext
172 * @throws IOException
173 */
174 public abstract void abortTask(TaskAttemptContext taskContext)
175 throws IOException;
176
177 /**
178 * This method implements the new interface by calling the old method. Note
179 * that the input types are different between the new and old apis and this
180 * is a bridge between the two.
181 */
182 @Override
183 public boolean isRecoverySupported() {
184 return false;
185 }
186
187 /**
188 * Recover the task output.
189 *
190 * The retry-count for the job will be passed via the
191 * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in
192 * {@link TaskAttemptContext#getConfiguration()} for the
193 * <code>OutputCommitter</code>. This is called from the application master
194 * process, but it is called individually for each task.
195 *
196 * If an exception is thrown the task will be attempted again.
197 *
198 * @param taskContext Context of the task whose output is being recovered
199 * @throws IOException
200 */
201 public void recoverTask(TaskAttemptContext taskContext)
202 throws IOException {
203 }
204
205 /**
206 * This method implements the new interface by calling the old method. Note
207 * that the input types are different between the new and old apis and this
208 * is a bridge between the two.
209 */
210 @Override
211 public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext
212 ) throws IOException {
213 setupJob((JobContext) jobContext);
214 }
215
216 /**
217 * This method implements the new interface by calling the old method. Note
218 * that the input types are different between the new and old apis and this
219 * is a bridge between the two.
220 * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)}
221 * or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
222 * instead.
223 */
224 @Override
225 @Deprecated
226 public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
227 ) throws IOException {
228 cleanupJob((JobContext) context);
229 }
230
231 /**
232 * This method implements the new interface by calling the old method. Note
233 * that the input types are different between the new and old apis and this
234 * is a bridge between the two.
235 */
236 @Override
237 public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
238 ) throws IOException {
239 commitJob((JobContext) context);
240 }
241
242 /**
243 * This method implements the new interface by calling the old method. Note
244 * that the input types are different between the new and old apis and this
245 * is a bridge between the two.
246 */
247 @Override
248 public final void abortJob(org.apache.hadoop.mapreduce.JobContext context,
249 org.apache.hadoop.mapreduce.JobStatus.State runState)
250 throws IOException {
251 int state = JobStatus.getOldNewJobRunState(runState);
252 if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
253 throw new IOException ("Invalid job run state : " + runState.name());
254 }
255 abortJob((JobContext) context, state);
256 }
257
258 /**
259 * This method implements the new interface by calling the old method. Note
260 * that the input types are different between the new and old apis and this
261 * is a bridge between the two.
262 */
263 @Override
264 public final
265 void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
266 ) throws IOException {
267 setupTask((TaskAttemptContext) taskContext);
268 }
269
270 /**
271 * This method implements the new interface by calling the old method. Note
272 * that the input types are different between the new and old apis and this
273 * is a bridge between the two.
274 */
275 @Override
276 public final boolean
277 needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
278 ) throws IOException {
279 return needsTaskCommit((TaskAttemptContext) taskContext);
280 }
281
282 /**
283 * This method implements the new interface by calling the old method. Note
284 * that the input types are different between the new and old apis and this
285 * is a bridge between the two.
286 */
287 @Override
288 public final
289 void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
290 ) throws IOException {
291 commitTask((TaskAttemptContext) taskContext);
292 }
293
294 /**
295 * This method implements the new interface by calling the old method. Note
296 * that the input types are different between the new and old apis and this
297 * is a bridge between the two.
298 */
299 @Override
300 public final
301 void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
302 ) throws IOException {
303 abortTask((TaskAttemptContext) taskContext);
304 }
305
306 /**
307 * This method implements the new interface by calling the old method. Note
308 * that the input types are different between the new and old apis and this
309 * is a bridge between the two.
310 */
311 @Override
312 public final
313 void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
314 ) throws IOException {
315 recoverTask((TaskAttemptContext) taskContext);
316 }
317
318 }