001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021import java.io.IOException; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025 026/** 027 * <code>OutputCommitter</code> describes the commit of task output for a 028 * Map-Reduce job. 029 * 030 * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of 031 * the job to:<p> 032 * <ol> 033 * <li> 034 * Setup the job during initialization. For example, create the temporary 035 * output directory for the job during the initialization of the job. 036 * </li> 037 * <li> 038 * Cleanup the job after the job completion. For example, remove the 039 * temporary output directory after the job completion. 040 * </li> 041 * <li> 042 * Setup the task temporary output. 043 * </li> 044 * <li> 045 * Check whether a task needs a commit. This is to avoid the commit 046 * procedure if a task does not need commit. 047 * </li> 048 * <li> 049 * Commit of the task output. 050 * </li> 051 * <li> 052 * Discard the task commit. 053 * </li> 054 * </ol> 055 * The methods in this class can be called from several different processes and 056 * from several different contexts. It is important to know which process and 057 * which context each is called from. Each method should be marked accordingly 058 * in its documentation. It is also important to note that not all methods are 059 * guaranteed to be called once and only once. If a method is not guaranteed to 060 * have this property the output committer needs to handle this appropriately. 061 * Also note it will only be in rare situations where they may be called 062 * multiple times for the same task. 063 * 064 * @see FileOutputCommitter 065 * @see JobContext 066 * @see TaskAttemptContext 067 */ 068@InterfaceAudience.Public 069@InterfaceStability.Stable 070public abstract class OutputCommitter 071 extends org.apache.hadoop.mapreduce.OutputCommitter { 072 /** 073 * For the framework to setup the job output during initialization. This is 074 * called from the application master process for the entire job. This will be 075 * called multiple times, once per job attempt. 076 * 077 * @param jobContext Context of the job whose output is being written. 078 * @throws IOException if temporary output could not be created 079 */ 080 public abstract void setupJob(JobContext jobContext) throws IOException; 081 082 /** 083 * For cleaning up the job's output after job completion. This is called 084 * from the application master process for the entire job. This may be called 085 * multiple times. 086 * 087 * @param jobContext Context of the job whose output is being written. 088 * @throws IOException 089 * @deprecated Use {@link #commitJob(JobContext)} or 090 * {@link #abortJob(JobContext, int)} instead. 091 */ 092 @Deprecated 093 public void cleanupJob(JobContext jobContext) throws IOException { } 094 095 /** 096 * For committing job's output after successful job completion. Note that this 097 * is invoked for jobs with final runstate as SUCCESSFUL. This is called 098 * from the application master process for the entire job. This is guaranteed 099 * to only be called once. If it throws an exception the entire job will 100 * fail. 101 * 102 * @param jobContext Context of the job whose output is being written. 103 * @throws IOException 104 */ 105 public void commitJob(JobContext jobContext) throws IOException { 106 cleanupJob(jobContext); 107 } 108 109 /** 110 * For aborting an unsuccessful job's output. Note that this is invoked for 111 * jobs with final runstate as {@link JobStatus#FAILED} or 112 * {@link JobStatus#KILLED}. This is called from the application 113 * master process for the entire job. This may be called multiple times. 114 * 115 * @param jobContext Context of the job whose output is being written. 116 * @param status final runstate of the job 117 * @throws IOException 118 */ 119 public void abortJob(JobContext jobContext, int status) 120 throws IOException { 121 cleanupJob(jobContext); 122 } 123 124 /** 125 * Sets up output for the task. This is called from each individual task's 126 * process that will output to HDFS, and it is called just for that task. This 127 * may be called multiple times for the same task, but for different task 128 * attempts. 129 * 130 * @param taskContext Context of the task whose output is being written. 131 * @throws IOException 132 */ 133 public abstract void setupTask(TaskAttemptContext taskContext) 134 throws IOException; 135 136 /** 137 * Check whether task needs a commit. This is called from each individual 138 * task's process that will output to HDFS, and it is called just for that 139 * task. 140 * 141 * @param taskContext 142 * @return true/false 143 * @throws IOException 144 */ 145 public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) 146 throws IOException; 147 148 /** 149 * To promote the task's temporary output to final output location. 150 * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this 151 * task is the task that the AM determines finished first, this method 152 * is called to commit an individual task's output. This is to mark 153 * that tasks output as complete, as {@link #commitJob(JobContext)} will 154 * also be called later on if the entire job finished successfully. This 155 * is called from a task's process. This may be called multiple times for the 156 * same task, but different task attempts. It should be very rare for this to 157 * be called multiple times and requires odd networking failures to make this 158 * happen. In the future the Hadoop framework may eliminate this race. 159 * 160 * @param taskContext Context of the task whose output is being written. 161 * @throws IOException if commit is not 162 */ 163 public abstract void commitTask(TaskAttemptContext taskContext) 164 throws IOException; 165 166 /** 167 * Discard the task output. This is called from a task's process to clean 168 * up a single task's output that can not yet been committed. This may be 169 * called multiple times for the same task, but for different task attempts. 170 * 171 * @param taskContext 172 * @throws IOException 173 */ 174 public abstract void abortTask(TaskAttemptContext taskContext) 175 throws IOException; 176 177 /** 178 * This method implements the new interface by calling the old method. Note 179 * that the input types are different between the new and old apis and this is 180 * a bridge between the two. 181 * 182 * @deprecated Use {@link #isRecoverySupported(JobContext)} instead. 183 */ 184 @Deprecated 185 @Override 186 public boolean isRecoverySupported() { 187 return false; 188 } 189 190 /** 191 * Is task output recovery supported for restarting jobs? 192 * 193 * If task output recovery is supported, job restart can be done more 194 * efficiently. 195 * 196 * @param jobContext 197 * Context of the job whose output is being written. 198 * @return <code>true</code> if task output recovery is supported, 199 * <code>false</code> otherwise 200 * @throws IOException 201 * @see #recoverTask(TaskAttemptContext) 202 */ 203 public boolean isRecoverySupported(JobContext jobContext) throws IOException { 204 return isRecoverySupported(); 205 } 206 207 /** 208 * Returns true if an in-progress job commit can be retried. If the MR AM is 209 * re-run then it will check this value to determine if it can retry an 210 * in-progress commit that was started by a previous version. 211 * Note that in rare scenarios, the previous AM version might still be running 212 * at that time, due to system anomalies. Hence if this method returns true 213 * then the retry commit operation should be able to run concurrently with 214 * the previous operation. 215 * 216 * If repeatable job commit is supported, job restart can tolerate previous 217 * AM failures during job commit. 218 * 219 * By default, it is not supported. Extended classes (like: 220 * FileOutputCommitter) should explicitly override it if provide support. 221 * 222 * @param jobContext 223 * Context of the job whose output is being written. 224 * @return <code>true</code> repeatable job commit is supported, 225 * <code>false</code> otherwise 226 * @throws IOException 227 */ 228 public boolean isCommitJobRepeatable(JobContext jobContext) throws 229 IOException { 230 return false; 231 } 232 233 @Override 234 public boolean isCommitJobRepeatable(org.apache.hadoop.mapreduce.JobContext 235 jobContext) throws IOException { 236 return isCommitJobRepeatable((JobContext) jobContext); 237 } 238 239 /** 240 * Recover the task output. 241 * 242 * The retry-count for the job will be passed via the 243 * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in 244 * {@link TaskAttemptContext#getConfiguration()} for the 245 * <code>OutputCommitter</code>. This is called from the application master 246 * process, but it is called individually for each task. 247 * 248 * If an exception is thrown the task will be attempted again. 249 * 250 * @param taskContext Context of the task whose output is being recovered 251 * @throws IOException 252 */ 253 public void recoverTask(TaskAttemptContext taskContext) 254 throws IOException { 255 } 256 257 /** 258 * This method implements the new interface by calling the old method. Note 259 * that the input types are different between the new and old apis and this 260 * is a bridge between the two. 261 */ 262 @Override 263 public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext 264 ) throws IOException { 265 setupJob((JobContext) jobContext); 266 } 267 268 /** 269 * This method implements the new interface by calling the old method. Note 270 * that the input types are different between the new and old apis and this 271 * is a bridge between the two. 272 * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)} 273 * or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)} 274 * instead. 275 */ 276 @Override 277 @Deprecated 278 public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context 279 ) throws IOException { 280 cleanupJob((JobContext) context); 281 } 282 283 /** 284 * This method implements the new interface by calling the old method. Note 285 * that the input types are different between the new and old apis and this 286 * is a bridge between the two. 287 */ 288 @Override 289 public final void commitJob(org.apache.hadoop.mapreduce.JobContext context 290 ) throws IOException { 291 commitJob((JobContext) context); 292 } 293 294 /** 295 * This method implements the new interface by calling the old method. Note 296 * that the input types are different between the new and old apis and this 297 * is a bridge between the two. 298 */ 299 @Override 300 public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 301 org.apache.hadoop.mapreduce.JobStatus.State runState) 302 throws IOException { 303 int state = JobStatus.getOldNewJobRunState(runState); 304 if (state != JobStatus.FAILED && state != JobStatus.KILLED) { 305 throw new IOException ("Invalid job run state : " + runState.name()); 306 } 307 abortJob((JobContext) context, state); 308 } 309 310 /** 311 * This method implements the new interface by calling the old method. Note 312 * that the input types are different between the new and old apis and this 313 * is a bridge between the two. 314 */ 315 @Override 316 public final 317 void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 318 ) throws IOException { 319 setupTask((TaskAttemptContext) taskContext); 320 } 321 322 /** 323 * This method implements the new interface by calling the old method. Note 324 * that the input types are different between the new and old apis and this 325 * is a bridge between the two. 326 */ 327 @Override 328 public final boolean 329 needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 330 ) throws IOException { 331 return needsTaskCommit((TaskAttemptContext) taskContext); 332 } 333 334 /** 335 * This method implements the new interface by calling the old method. Note 336 * that the input types are different between the new and old apis and this 337 * is a bridge between the two. 338 */ 339 @Override 340 public final 341 void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 342 ) throws IOException { 343 commitTask((TaskAttemptContext) taskContext); 344 } 345 346 /** 347 * This method implements the new interface by calling the old method. Note 348 * that the input types are different between the new and old apis and this 349 * is a bridge between the two. 350 */ 351 @Override 352 public final 353 void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 354 ) throws IOException { 355 abortTask((TaskAttemptContext) taskContext); 356 } 357 358 /** 359 * This method implements the new interface by calling the old method. Note 360 * that the input types are different between the new and old apis and this 361 * is a bridge between the two. 362 */ 363 @Override 364 public final 365 void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 366 ) throws IOException { 367 recoverTask((TaskAttemptContext) taskContext); 368 } 369 370 /** 371 * This method implements the new interface by calling the old method. Note 372 * that the input types are different between the new and old apis and this is 373 * a bridge between the two. 374 */ 375 @Override 376 public final boolean isRecoverySupported( 377 org.apache.hadoop.mapreduce.JobContext context) throws IOException { 378 return isRecoverySupported((JobContext) context); 379 } 380 381}