001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 import java.io.IOException; 022 023 import org.apache.hadoop.classification.InterfaceAudience; 024 import org.apache.hadoop.classification.InterfaceStability; 025 026 /** 027 * <code>OutputCommitter</code> describes the commit of task output for a 028 * Map-Reduce job. 029 * 030 * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of 031 * the job to:<p> 032 * <ol> 033 * <li> 034 * Setup the job during initialization. For example, create the temporary 035 * output directory for the job during the initialization of the job. 036 * </li> 037 * <li> 038 * Cleanup the job after the job completion. For example, remove the 039 * temporary output directory after the job completion. 040 * </li> 041 * <li> 042 * Setup the task temporary output. 043 * </li> 044 * <li> 045 * Check whether a task needs a commit. This is to avoid the commit 046 * procedure if a task does not need commit. 047 * </li> 048 * <li> 049 * Commit of the task output. 050 * </li> 051 * <li> 052 * Discard the task commit. 053 * </li> 054 * </ol> 055 * The methods in this class can be called from several different processes and 056 * from several different contexts. It is important to know which process and 057 * which context each is called from. Each method should be marked accordingly 058 * in its documentation. It is also important to note that not all methods are 059 * guaranteed to be called once and only once. If a method is not guaranteed to 060 * have this property the output committer needs to handle this appropriately. 061 * Also note it will only be in rare situations where they may be called 062 * multiple times for the same task. 063 * 064 * @see FileOutputCommitter 065 * @see JobContext 066 * @see TaskAttemptContext 067 */ 068 @InterfaceAudience.Public 069 @InterfaceStability.Stable 070 public abstract class OutputCommitter 071 extends org.apache.hadoop.mapreduce.OutputCommitter { 072 /** 073 * For the framework to setup the job output during initialization. This is 074 * called from the application master process for the entire job. This will be 075 * called multiple times, once per job attempt. 076 * 077 * @param jobContext Context of the job whose output is being written. 078 * @throws IOException if temporary output could not be created 079 */ 080 public abstract void setupJob(JobContext jobContext) throws IOException; 081 082 /** 083 * For cleaning up the job's output after job completion. This is called 084 * from the application master process for the entire job. This may be called 085 * multiple times. 086 * 087 * @param jobContext Context of the job whose output is being written. 088 * @throws IOException 089 * @deprecated Use {@link #commitJob(JobContext)} or 090 * {@link #abortJob(JobContext, int)} instead. 091 */ 092 @Deprecated 093 public void cleanupJob(JobContext jobContext) throws IOException { } 094 095 /** 096 * For committing job's output after successful job completion. Note that this 097 * is invoked for jobs with final runstate as SUCCESSFUL. This is called 098 * from the application master process for the entire job. This is guaranteed 099 * to only be called once. If it throws an exception the entire job will 100 * fail. 101 * 102 * @param jobContext Context of the job whose output is being written. 103 * @throws IOException 104 */ 105 public void commitJob(JobContext jobContext) throws IOException { 106 cleanupJob(jobContext); 107 } 108 109 /** 110 * For aborting an unsuccessful job's output. Note that this is invoked for 111 * jobs with final runstate as {@link JobStatus#FAILED} or 112 * {@link JobStatus#KILLED}. This is called from the application 113 * master process for the entire job. This may be called multiple times. 114 * 115 * @param jobContext Context of the job whose output is being written. 116 * @param status final runstate of the job 117 * @throws IOException 118 */ 119 public void abortJob(JobContext jobContext, int status) 120 throws IOException { 121 cleanupJob(jobContext); 122 } 123 124 /** 125 * Sets up output for the task. This is called from each individual task's 126 * process that will output to HDFS, and it is called just for that task. This 127 * may be called multiple times for the same task, but for different task 128 * attempts. 129 * 130 * @param taskContext Context of the task whose output is being written. 131 * @throws IOException 132 */ 133 public abstract void setupTask(TaskAttemptContext taskContext) 134 throws IOException; 135 136 /** 137 * Check whether task needs a commit. This is called from each individual 138 * task's process that will output to HDFS, and it is called just for that 139 * task. 140 * 141 * @param taskContext 142 * @return true/false 143 * @throws IOException 144 */ 145 public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) 146 throws IOException; 147 148 /** 149 * To promote the task's temporary output to final output location. 150 * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this 151 * task is the task that the AM determines finished first, this method 152 * is called to commit an individual task's output. This is to mark 153 * that tasks output as complete, as {@link #commitJob(JobContext)} will 154 * also be called later on if the entire job finished successfully. This 155 * is called from a task's process. This may be called multiple times for the 156 * same task, but different task attempts. It should be very rare for this to 157 * be called multiple times and requires odd networking failures to make this 158 * happen. In the future the Hadoop framework may eliminate this race. 159 * 160 * @param taskContext Context of the task whose output is being written. 161 * @throws IOException if commit is not 162 */ 163 public abstract void commitTask(TaskAttemptContext taskContext) 164 throws IOException; 165 166 /** 167 * Discard the task output. This is called from a task's process to clean 168 * up a single task's output that can not yet been committed. This may be 169 * called multiple times for the same task, but for different task attempts. 170 * 171 * @param taskContext 172 * @throws IOException 173 */ 174 public abstract void abortTask(TaskAttemptContext taskContext) 175 throws IOException; 176 177 /** 178 * This method implements the new interface by calling the old method. Note 179 * that the input types are different between the new and old apis and this 180 * is a bridge between the two. 181 */ 182 @Override 183 public boolean isRecoverySupported() { 184 return false; 185 } 186 187 /** 188 * Recover the task output. 189 * 190 * The retry-count for the job will be passed via the 191 * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in 192 * {@link TaskAttemptContext#getConfiguration()} for the 193 * <code>OutputCommitter</code>. This is called from the application master 194 * process, but it is called individually for each task. 195 * 196 * If an exception is thrown the task will be attempted again. 197 * 198 * @param taskContext Context of the task whose output is being recovered 199 * @throws IOException 200 */ 201 public void recoverTask(TaskAttemptContext taskContext) 202 throws IOException { 203 } 204 205 /** 206 * This method implements the new interface by calling the old method. Note 207 * that the input types are different between the new and old apis and this 208 * is a bridge between the two. 209 */ 210 @Override 211 public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext 212 ) throws IOException { 213 setupJob((JobContext) jobContext); 214 } 215 216 /** 217 * This method implements the new interface by calling the old method. Note 218 * that the input types are different between the new and old apis and this 219 * is a bridge between the two. 220 * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)} 221 * or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)} 222 * instead. 223 */ 224 @Override 225 @Deprecated 226 public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context 227 ) throws IOException { 228 cleanupJob((JobContext) context); 229 } 230 231 /** 232 * This method implements the new interface by calling the old method. Note 233 * that the input types are different between the new and old apis and this 234 * is a bridge between the two. 235 */ 236 @Override 237 public final void commitJob(org.apache.hadoop.mapreduce.JobContext context 238 ) throws IOException { 239 commitJob((JobContext) context); 240 } 241 242 /** 243 * This method implements the new interface by calling the old method. Note 244 * that the input types are different between the new and old apis and this 245 * is a bridge between the two. 246 */ 247 @Override 248 public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 249 org.apache.hadoop.mapreduce.JobStatus.State runState) 250 throws IOException { 251 int state = JobStatus.getOldNewJobRunState(runState); 252 if (state != JobStatus.FAILED && state != JobStatus.KILLED) { 253 throw new IOException ("Invalid job run state : " + runState.name()); 254 } 255 abortJob((JobContext) context, state); 256 } 257 258 /** 259 * This method implements the new interface by calling the old method. Note 260 * that the input types are different between the new and old apis and this 261 * is a bridge between the two. 262 */ 263 @Override 264 public final 265 void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 266 ) throws IOException { 267 setupTask((TaskAttemptContext) taskContext); 268 } 269 270 /** 271 * This method implements the new interface by calling the old method. Note 272 * that the input types are different between the new and old apis and this 273 * is a bridge between the two. 274 */ 275 @Override 276 public final boolean 277 needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 278 ) throws IOException { 279 return needsTaskCommit((TaskAttemptContext) taskContext); 280 } 281 282 /** 283 * This method implements the new interface by calling the old method. Note 284 * that the input types are different between the new and old apis and this 285 * is a bridge between the two. 286 */ 287 @Override 288 public final 289 void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 290 ) throws IOException { 291 commitTask((TaskAttemptContext) taskContext); 292 } 293 294 /** 295 * This method implements the new interface by calling the old method. Note 296 * that the input types are different between the new and old apis and this 297 * is a bridge between the two. 298 */ 299 @Override 300 public final 301 void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 302 ) throws IOException { 303 abortTask((TaskAttemptContext) taskContext); 304 } 305 306 /** 307 * This method implements the new interface by calling the old method. Note 308 * that the input types are different between the new and old apis and this 309 * is a bridge between the two. 310 */ 311 @Override 312 public final 313 void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 314 ) throws IOException { 315 recoverTask((TaskAttemptContext) taskContext); 316 } 317 318 }