001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 import java.io.IOException; 022 023 import org.apache.hadoop.classification.InterfaceAudience; 024 import org.apache.hadoop.classification.InterfaceStability; 025 026 /** 027 * <code>OutputCommitter</code> describes the commit of task output for a 028 * Map-Reduce job. 029 * 030 * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of 031 * the job to:<p> 032 * <ol> 033 * <li> 034 * Setup the job during initialization. For example, create the temporary 035 * output directory for the job during the initialization of the job. 036 * </li> 037 * <li> 038 * Cleanup the job after the job completion. For example, remove the 039 * temporary output directory after the job completion. 040 * </li> 041 * <li> 042 * Setup the task temporary output. 043 * </li> 044 * <li> 045 * Check whether a task needs a commit. This is to avoid the commit 046 * procedure if a task does not need commit. 047 * </li> 048 * <li> 049 * Commit of the task output. 050 * </li> 051 * <li> 052 * Discard the task commit. 053 * </li> 054 * </ol> 055 * The methods in this class can be called from several different processes and 056 * from several different contexts. It is important to know which process and 057 * which context each is called from. Each method should be marked accordingly 058 * in its documentation. It is also important to note that not all methods are 059 * guaranteed to be called once and only once. If a method is not guaranteed to 060 * have this property the output committer needs to handle this appropriately. 061 * Also note it will only be in rare situations where they may be called 062 * multiple times for the same task. 063 * 064 * @see FileOutputCommitter 065 * @see JobContext 066 * @see TaskAttemptContext 067 */ 068 @InterfaceAudience.Public 069 @InterfaceStability.Stable 070 public abstract class OutputCommitter 071 extends org.apache.hadoop.mapreduce.OutputCommitter { 072 /** 073 * For the framework to setup the job output during initialization. This is 074 * called from the application master process for the entire job. This will be 075 * called multiple times, once per job attempt. 076 * 077 * @param jobContext Context of the job whose output is being written. 078 * @throws IOException if temporary output could not be created 079 */ 080 public abstract void setupJob(JobContext jobContext) throws IOException; 081 082 /** 083 * For cleaning up the job's output after job completion. This is called 084 * from the application master process for the entire job. This may be called 085 * multiple times. 086 * 087 * @param jobContext Context of the job whose output is being written. 088 * @throws IOException 089 * @deprecated Use {@link #commitJob(JobContext)} or 090 * {@link #abortJob(JobContext, int)} instead. 091 */ 092 @Deprecated 093 public void cleanupJob(JobContext jobContext) throws IOException { } 094 095 /** 096 * For committing job's output after successful job completion. Note that this 097 * is invoked for jobs with final runstate as SUCCESSFUL. This is called 098 * from the application master process for the entire job. This is guaranteed 099 * to only be called once. If it throws an exception the entire job will 100 * fail. 101 * 102 * @param jobContext Context of the job whose output is being written. 103 * @throws IOException 104 */ 105 public void commitJob(JobContext jobContext) throws IOException { 106 cleanupJob(jobContext); 107 } 108 109 /** 110 * For aborting an unsuccessful job's output. Note that this is invoked for 111 * jobs with final runstate as {@link JobStatus#FAILED} or 112 * {@link JobStatus#KILLED}. This is called from the application 113 * master process for the entire job. This may be called multiple times. 114 * 115 * @param jobContext Context of the job whose output is being written. 116 * @param status final runstate of the job 117 * @throws IOException 118 */ 119 public void abortJob(JobContext jobContext, int status) 120 throws IOException { 121 cleanupJob(jobContext); 122 } 123 124 /** 125 * Sets up output for the task. This is called from each individual task's 126 * process that will output to HDFS, and it is called just for that task. This 127 * may be called multiple times for the same task, but for different task 128 * attempts. 129 * 130 * @param taskContext Context of the task whose output is being written. 131 * @throws IOException 132 */ 133 public abstract void setupTask(TaskAttemptContext taskContext) 134 throws IOException; 135 136 /** 137 * Check whether task needs a commit. This is called from each individual 138 * task's process that will output to HDFS, and it is called just for that 139 * task. 140 * 141 * @param taskContext 142 * @return true/false 143 * @throws IOException 144 */ 145 public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) 146 throws IOException; 147 148 /** 149 * To promote the task's temporary output to final output location. 150 * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this 151 * task is the task that the AM determines finished first, this method 152 * is called to commit an individual task's output. This is to mark 153 * that tasks output as complete, as {@link #commitJob(JobContext)} will 154 * also be called later on if the entire job finished successfully. This 155 * is called from a task's process. This may be called multiple times for the 156 * same task, but different task attempts. It should be very rare for this to 157 * be called multiple times and requires odd networking failures to make this 158 * happen. In the future the Hadoop framework may eliminate this race. 159 * 160 * @param taskContext Context of the task whose output is being written. 161 * @throws IOException if commit is not 162 */ 163 public abstract void commitTask(TaskAttemptContext taskContext) 164 throws IOException; 165 166 /** 167 * Discard the task output. This is called from a task's process to clean 168 * up a single task's output that can not yet been committed. This may be 169 * called multiple times for the same task, but for different task attempts. 170 * 171 * @param taskContext 172 * @throws IOException 173 */ 174 public abstract void abortTask(TaskAttemptContext taskContext) 175 throws IOException; 176 177 /** 178 * This method implements the new interface by calling the old method. Note 179 * that the input types are different between the new and old apis and this is 180 * a bridge between the two. 181 * 182 * @deprecated Use {@link #isRecoverySupported(JobContext)} instead. 183 */ 184 @Deprecated 185 @Override 186 public boolean isRecoverySupported() { 187 return false; 188 } 189 190 /** 191 * Is task output recovery supported for restarting jobs? 192 * 193 * If task output recovery is supported, job restart can be done more 194 * efficiently. 195 * 196 * @param jobContext 197 * Context of the job whose output is being written. 198 * @return <code>true</code> if task output recovery is supported, 199 * <code>false</code> otherwise 200 * @throws IOException 201 * @see #recoverTask(TaskAttemptContext) 202 */ 203 public boolean isRecoverySupported(JobContext jobContext) throws IOException { 204 return isRecoverySupported(); 205 } 206 207 /** 208 * Recover the task output. 209 * 210 * The retry-count for the job will be passed via the 211 * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in 212 * {@link TaskAttemptContext#getConfiguration()} for the 213 * <code>OutputCommitter</code>. This is called from the application master 214 * process, but it is called individually for each task. 215 * 216 * If an exception is thrown the task will be attempted again. 217 * 218 * @param taskContext Context of the task whose output is being recovered 219 * @throws IOException 220 */ 221 public void recoverTask(TaskAttemptContext taskContext) 222 throws IOException { 223 } 224 225 /** 226 * This method implements the new interface by calling the old method. Note 227 * that the input types are different between the new and old apis and this 228 * is a bridge between the two. 229 */ 230 @Override 231 public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext 232 ) throws IOException { 233 setupJob((JobContext) jobContext); 234 } 235 236 /** 237 * This method implements the new interface by calling the old method. Note 238 * that the input types are different between the new and old apis and this 239 * is a bridge between the two. 240 * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)} 241 * or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)} 242 * instead. 243 */ 244 @Override 245 @Deprecated 246 public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context 247 ) throws IOException { 248 cleanupJob((JobContext) context); 249 } 250 251 /** 252 * This method implements the new interface by calling the old method. Note 253 * that the input types are different between the new and old apis and this 254 * is a bridge between the two. 255 */ 256 @Override 257 public final void commitJob(org.apache.hadoop.mapreduce.JobContext context 258 ) throws IOException { 259 commitJob((JobContext) context); 260 } 261 262 /** 263 * This method implements the new interface by calling the old method. Note 264 * that the input types are different between the new and old apis and this 265 * is a bridge between the two. 266 */ 267 @Override 268 public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 269 org.apache.hadoop.mapreduce.JobStatus.State runState) 270 throws IOException { 271 int state = JobStatus.getOldNewJobRunState(runState); 272 if (state != JobStatus.FAILED && state != JobStatus.KILLED) { 273 throw new IOException ("Invalid job run state : " + runState.name()); 274 } 275 abortJob((JobContext) context, state); 276 } 277 278 /** 279 * This method implements the new interface by calling the old method. Note 280 * that the input types are different between the new and old apis and this 281 * is a bridge between the two. 282 */ 283 @Override 284 public final 285 void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 286 ) throws IOException { 287 setupTask((TaskAttemptContext) taskContext); 288 } 289 290 /** 291 * This method implements the new interface by calling the old method. Note 292 * that the input types are different between the new and old apis and this 293 * is a bridge between the two. 294 */ 295 @Override 296 public final boolean 297 needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 298 ) throws IOException { 299 return needsTaskCommit((TaskAttemptContext) taskContext); 300 } 301 302 /** 303 * This method implements the new interface by calling the old method. Note 304 * that the input types are different between the new and old apis and this 305 * is a bridge between the two. 306 */ 307 @Override 308 public final 309 void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 310 ) throws IOException { 311 commitTask((TaskAttemptContext) taskContext); 312 } 313 314 /** 315 * This method implements the new interface by calling the old method. Note 316 * that the input types are different between the new and old apis and this 317 * is a bridge between the two. 318 */ 319 @Override 320 public final 321 void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 322 ) throws IOException { 323 abortTask((TaskAttemptContext) taskContext); 324 } 325 326 /** 327 * This method implements the new interface by calling the old method. Note 328 * that the input types are different between the new and old apis and this 329 * is a bridge between the two. 330 */ 331 @Override 332 public final 333 void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 334 ) throws IOException { 335 recoverTask((TaskAttemptContext) taskContext); 336 } 337 338 /** 339 * This method implements the new interface by calling the old method. Note 340 * that the input types are different between the new and old apis and this is 341 * a bridge between the two. 342 */ 343 @Override 344 public final boolean isRecoverySupported( 345 org.apache.hadoop.mapreduce.JobContext context) throws IOException { 346 return isRecoverySupported((JobContext) context); 347 } 348 349 }