001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.output; 020 021import java.io.IOException; 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceAudience.Private; 027import org.apache.hadoop.classification.InterfaceStability; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.fs.PathFilter; 032import org.apache.hadoop.mapreduce.JobContext; 033import org.apache.hadoop.mapreduce.JobStatus; 034import org.apache.hadoop.mapreduce.MRJobConfig; 035import org.apache.hadoop.mapreduce.OutputCommitter; 036import org.apache.hadoop.mapreduce.TaskAttemptContext; 037import org.apache.hadoop.mapreduce.TaskAttemptID; 038 039/** An {@link OutputCommitter} that commits files specified 040 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. 041 **/ 042@InterfaceAudience.Public 043@InterfaceStability.Stable 044public class FileOutputCommitter extends OutputCommitter { 045 private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class); 046 047 /** 048 * Name of directory where pending data is placed. Data that has not been 049 * committed yet. 050 */ 051 public static final String PENDING_DIR_NAME = "_temporary"; 052 /** 053 * Temporary directory name 054 * 055 * The static variable to be compatible with M/R 1.x 056 */ 057 @Deprecated 058 protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME; 059 public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; 060 public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 061 "mapreduce.fileoutputcommitter.marksuccessfuljobs"; 062 private Path outputPath = null; 063 private Path workPath = null; 064 065 /** 066 * Create a file output committer 067 * @param outputPath the job's output path, or null if you want the output 068 * committer to act as a noop. 069 * @param context the task's context 070 * @throws IOException 071 */ 072 public FileOutputCommitter(Path outputPath, 073 TaskAttemptContext context) throws IOException { 074 this(outputPath, (JobContext)context); 075 if (outputPath != null) { 076 workPath = getTaskAttemptPath(context, outputPath); 077 } 078 } 079 080 /** 081 * Create a file output committer 082 * @param outputPath the job's output path, or null if you want the output 083 * committer to act as a noop. 084 * @param context the task's context 085 * @throws IOException 086 */ 087 @Private 088 public FileOutputCommitter(Path outputPath, 089 JobContext context) throws IOException { 090 if (outputPath != null) { 091 FileSystem fs = outputPath.getFileSystem(context.getConfiguration()); 092 this.outputPath = fs.makeQualified(outputPath); 093 } 094 } 095 096 /** 097 * @return the path where final output of the job should be placed. This 098 * could also be considered the committed application attempt path. 099 */ 100 private Path getOutputPath() { 101 return this.outputPath; 102 } 103 104 /** 105 * @return true if we have an output path set, else false. 106 */ 107 private boolean hasOutputPath() { 108 return this.outputPath != null; 109 } 110 111 /** 112 * @return the path where the output of pending job attempts are 113 * stored. 114 */ 115 private Path getPendingJobAttemptsPath() { 116 return getPendingJobAttemptsPath(getOutputPath()); 117 } 118 119 /** 120 * Get the location of pending job attempts. 121 * @param out the base output directory. 122 * @return the location of pending job attempts. 123 */ 124 private static Path getPendingJobAttemptsPath(Path out) { 125 return new Path(out, PENDING_DIR_NAME); 126 } 127 128 /** 129 * Get the Application Attempt Id for this job 130 * @param context the context to look in 131 * @return the Application Attempt Id for a given job. 132 */ 133 private static int getAppAttemptId(JobContext context) { 134 return context.getConfiguration().getInt( 135 MRJobConfig.APPLICATION_ATTEMPT_ID, 0); 136 } 137 138 /** 139 * Compute the path where the output of a given job attempt will be placed. 140 * @param context the context of the job. This is used to get the 141 * application attempt id. 142 * @return the path to store job attempt data. 143 */ 144 public Path getJobAttemptPath(JobContext context) { 145 return getJobAttemptPath(context, getOutputPath()); 146 } 147 148 /** 149 * Compute the path where the output of a given job attempt will be placed. 150 * @param context the context of the job. This is used to get the 151 * application attempt id. 152 * @param out the output path to place these in. 153 * @return the path to store job attempt data. 154 */ 155 public static Path getJobAttemptPath(JobContext context, Path out) { 156 return getJobAttemptPath(getAppAttemptId(context), out); 157 } 158 159 /** 160 * Compute the path where the output of a given job attempt will be placed. 161 * @param appAttemptId the ID of the application attempt for this job. 162 * @return the path to store job attempt data. 163 */ 164 protected Path getJobAttemptPath(int appAttemptId) { 165 return getJobAttemptPath(appAttemptId, getOutputPath()); 166 } 167 168 /** 169 * Compute the path where the output of a given job attempt will be placed. 170 * @param appAttemptId the ID of the application attempt for this job. 171 * @return the path to store job attempt data. 172 */ 173 private static Path getJobAttemptPath(int appAttemptId, Path out) { 174 return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId)); 175 } 176 177 /** 178 * Compute the path where the output of pending task attempts are stored. 179 * @param context the context of the job with pending tasks. 180 * @return the path where the output of pending task attempts are stored. 181 */ 182 private Path getPendingTaskAttemptsPath(JobContext context) { 183 return getPendingTaskAttemptsPath(context, getOutputPath()); 184 } 185 186 /** 187 * Compute the path where the output of pending task attempts are stored. 188 * @param context the context of the job with pending tasks. 189 * @return the path where the output of pending task attempts are stored. 190 */ 191 private static Path getPendingTaskAttemptsPath(JobContext context, Path out) { 192 return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME); 193 } 194 195 /** 196 * Compute the path where the output of a task attempt is stored until 197 * that task is committed. 198 * 199 * @param context the context of the task attempt. 200 * @return the path where a task attempt should be stored. 201 */ 202 public Path getTaskAttemptPath(TaskAttemptContext context) { 203 return new Path(getPendingTaskAttemptsPath(context), 204 String.valueOf(context.getTaskAttemptID())); 205 } 206 207 /** 208 * Compute the path where the output of a task attempt is stored until 209 * that task is committed. 210 * 211 * @param context the context of the task attempt. 212 * @param out The output path to put things in. 213 * @return the path where a task attempt should be stored. 214 */ 215 public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) { 216 return new Path(getPendingTaskAttemptsPath(context, out), 217 String.valueOf(context.getTaskAttemptID())); 218 } 219 220 /** 221 * Compute the path where the output of a committed task is stored until 222 * the entire job is committed. 223 * @param context the context of the task attempt 224 * @return the path where the output of a committed task is stored until 225 * the entire job is committed. 226 */ 227 public Path getCommittedTaskPath(TaskAttemptContext context) { 228 return getCommittedTaskPath(getAppAttemptId(context), context); 229 } 230 231 public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) { 232 return getCommittedTaskPath(getAppAttemptId(context), context, out); 233 } 234 235 /** 236 * Compute the path where the output of a committed task is stored until the 237 * entire job is committed for a specific application attempt. 238 * @param appAttemptId the id of the application attempt to use 239 * @param context the context of any task. 240 * @return the path where the output of a committed task is stored. 241 */ 242 protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) { 243 return new Path(getJobAttemptPath(appAttemptId), 244 String.valueOf(context.getTaskAttemptID().getTaskID())); 245 } 246 247 private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) { 248 return new Path(getJobAttemptPath(appAttemptId, out), 249 String.valueOf(context.getTaskAttemptID().getTaskID())); 250 } 251 252 private static class CommittedTaskFilter implements PathFilter { 253 @Override 254 public boolean accept(Path path) { 255 return !PENDING_DIR_NAME.equals(path.getName()); 256 } 257 } 258 259 /** 260 * Get a list of all paths where output from committed tasks are stored. 261 * @param context the context of the current job 262 * @return the list of these Paths/FileStatuses. 263 * @throws IOException 264 */ 265 private FileStatus[] getAllCommittedTaskPaths(JobContext context) 266 throws IOException { 267 Path jobAttemptPath = getJobAttemptPath(context); 268 FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration()); 269 return fs.listStatus(jobAttemptPath, new CommittedTaskFilter()); 270 } 271 272 /** 273 * Get the directory that the task should write results into. 274 * @return the work directory 275 * @throws IOException 276 */ 277 public Path getWorkPath() throws IOException { 278 return workPath; 279 } 280 281 /** 282 * Create the temporary directory that is the root of all of the task 283 * work directories. 284 * @param context the job's context 285 */ 286 public void setupJob(JobContext context) throws IOException { 287 if (hasOutputPath()) { 288 Path jobAttemptPath = getJobAttemptPath(context); 289 FileSystem fs = jobAttemptPath.getFileSystem( 290 context.getConfiguration()); 291 if (!fs.mkdirs(jobAttemptPath)) { 292 LOG.error("Mkdirs failed to create " + jobAttemptPath); 293 } 294 } else { 295 LOG.warn("Output Path is null in setupJob()"); 296 } 297 } 298 299 /** 300 * The job has completed so move all committed tasks to the final output dir. 301 * Delete the temporary directory, including all of the work directories. 302 * Create a _SUCCESS file to make it as successful. 303 * @param context the job's context 304 */ 305 public void commitJob(JobContext context) throws IOException { 306 if (hasOutputPath()) { 307 Path finalOutput = getOutputPath(); 308 FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); 309 for(FileStatus stat: getAllCommittedTaskPaths(context)) { 310 mergePaths(fs, stat, finalOutput); 311 } 312 313 // delete the _temporary folder and create a _done file in the o/p folder 314 cleanupJob(context); 315 // True if the job requires output.dir marked on successful job. 316 // Note that by default it is set to true. 317 if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { 318 Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME); 319 fs.create(markerPath).close(); 320 } 321 } else { 322 LOG.warn("Output Path is null in commitJob()"); 323 } 324 } 325 326 /** 327 * Merge two paths together. Anything in from will be moved into to, if there 328 * are any name conflicts while merging the files or directories in from win. 329 * @param fs the File System to use 330 * @param from the path data is coming from. 331 * @param to the path data is going to. 332 * @throws IOException on any error 333 */ 334 private static void mergePaths(FileSystem fs, final FileStatus from, 335 final Path to) 336 throws IOException { 337 LOG.debug("Merging data from "+from+" to "+to); 338 if(from.isFile()) { 339 if(fs.exists(to)) { 340 if(!fs.delete(to, true)) { 341 throw new IOException("Failed to delete "+to); 342 } 343 } 344 345 if(!fs.rename(from.getPath(), to)) { 346 throw new IOException("Failed to rename "+from+" to "+to); 347 } 348 } else if(from.isDirectory()) { 349 if(fs.exists(to)) { 350 FileStatus toStat = fs.getFileStatus(to); 351 if(!toStat.isDirectory()) { 352 if(!fs.delete(to, true)) { 353 throw new IOException("Failed to delete "+to); 354 } 355 if(!fs.rename(from.getPath(), to)) { 356 throw new IOException("Failed to rename "+from+" to "+to); 357 } 358 } else { 359 //It is a directory so merge everything in the directories 360 for(FileStatus subFrom: fs.listStatus(from.getPath())) { 361 Path subTo = new Path(to, subFrom.getPath().getName()); 362 mergePaths(fs, subFrom, subTo); 363 } 364 } 365 } else { 366 //it does not exist just rename 367 if(!fs.rename(from.getPath(), to)) { 368 throw new IOException("Failed to rename "+from+" to "+to); 369 } 370 } 371 } 372 } 373 374 @Override 375 @Deprecated 376 public void cleanupJob(JobContext context) throws IOException { 377 if (hasOutputPath()) { 378 Path pendingJobAttemptsPath = getPendingJobAttemptsPath(); 379 FileSystem fs = pendingJobAttemptsPath 380 .getFileSystem(context.getConfiguration()); 381 fs.delete(pendingJobAttemptsPath, true); 382 } else { 383 LOG.warn("Output Path is null in cleanupJob()"); 384 } 385 } 386 387 /** 388 * Delete the temporary directory, including all of the work directories. 389 * @param context the job's context 390 */ 391 @Override 392 public void abortJob(JobContext context, JobStatus.State state) 393 throws IOException { 394 // delete the _temporary folder 395 cleanupJob(context); 396 } 397 398 /** 399 * No task setup required. 400 */ 401 @Override 402 public void setupTask(TaskAttemptContext context) throws IOException { 403 // FileOutputCommitter's setupTask doesn't do anything. Because the 404 // temporary task directory is created on demand when the 405 // task is writing. 406 } 407 408 /** 409 * Move the files from the work directory to the job output directory 410 * @param context the task context 411 */ 412 @Override 413 public void commitTask(TaskAttemptContext context) 414 throws IOException { 415 commitTask(context, null); 416 } 417 418 @Private 419 public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 420 throws IOException { 421 TaskAttemptID attemptId = context.getTaskAttemptID(); 422 if (hasOutputPath()) { 423 context.progress(); 424 if(taskAttemptPath == null) { 425 taskAttemptPath = getTaskAttemptPath(context); 426 } 427 Path committedTaskPath = getCommittedTaskPath(context); 428 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 429 if (fs.exists(taskAttemptPath)) { 430 if(fs.exists(committedTaskPath)) { 431 if(!fs.delete(committedTaskPath, true)) { 432 throw new IOException("Could not delete " + committedTaskPath); 433 } 434 } 435 if(!fs.rename(taskAttemptPath, committedTaskPath)) { 436 throw new IOException("Could not rename " + taskAttemptPath + " to " 437 + committedTaskPath); 438 } 439 LOG.info("Saved output of task '" + attemptId + "' to " + 440 committedTaskPath); 441 } else { 442 LOG.warn("No Output found for " + attemptId); 443 } 444 } else { 445 LOG.warn("Output Path is null in commitTask()"); 446 } 447 } 448 449 /** 450 * Delete the work directory 451 * @throws IOException 452 */ 453 @Override 454 public void abortTask(TaskAttemptContext context) throws IOException { 455 abortTask(context, null); 456 } 457 458 @Private 459 public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException { 460 if (hasOutputPath()) { 461 context.progress(); 462 if(taskAttemptPath == null) { 463 taskAttemptPath = getTaskAttemptPath(context); 464 } 465 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 466 if(!fs.delete(taskAttemptPath, true)) { 467 LOG.warn("Could not delete "+taskAttemptPath); 468 } 469 } else { 470 LOG.warn("Output Path is null in abortTask()"); 471 } 472 } 473 474 /** 475 * Did this task write any files in the work directory? 476 * @param context the task's context 477 */ 478 @Override 479 public boolean needsTaskCommit(TaskAttemptContext context 480 ) throws IOException { 481 return needsTaskCommit(context, null); 482 } 483 484 @Private 485 public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath 486 ) throws IOException { 487 if(hasOutputPath()) { 488 if(taskAttemptPath == null) { 489 taskAttemptPath = getTaskAttemptPath(context); 490 } 491 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 492 return fs.exists(taskAttemptPath); 493 } 494 return false; 495 } 496 497 @Override 498 @Deprecated 499 public boolean isRecoverySupported() { 500 return true; 501 } 502 503 @Override 504 public void recoverTask(TaskAttemptContext context) 505 throws IOException { 506 if(hasOutputPath()) { 507 context.progress(); 508 TaskAttemptID attemptId = context.getTaskAttemptID(); 509 int previousAttempt = getAppAttemptId(context) - 1; 510 if (previousAttempt < 0) { 511 throw new IOException ("Cannot recover task output for first attempt..."); 512 } 513 514 Path committedTaskPath = getCommittedTaskPath(context); 515 Path previousCommittedTaskPath = getCommittedTaskPath( 516 previousAttempt, context); 517 FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration()); 518 519 LOG.debug("Trying to recover task from " + previousCommittedTaskPath 520 + " into " + committedTaskPath); 521 if (fs.exists(previousCommittedTaskPath)) { 522 if(fs.exists(committedTaskPath)) { 523 if(!fs.delete(committedTaskPath, true)) { 524 throw new IOException("Could not delete "+committedTaskPath); 525 } 526 } 527 //Rename can fail if the parent directory does not yet exist. 528 Path committedParent = committedTaskPath.getParent(); 529 fs.mkdirs(committedParent); 530 if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) { 531 throw new IOException("Could not rename " + previousCommittedTaskPath + 532 " to " + committedTaskPath); 533 } 534 LOG.info("Saved output of " + attemptId + " to " + committedTaskPath); 535 } else { 536 LOG.warn(attemptId+" had no output to recover."); 537 } 538 } else { 539 LOG.warn("Output Path is null in recoverTask()"); 540 } 541 } 542}