001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.output; 020 021import java.io.IOException; 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceAudience.Private; 027import org.apache.hadoop.classification.InterfaceStability; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.fs.PathFilter; 032import org.apache.hadoop.mapreduce.JobContext; 033import org.apache.hadoop.mapreduce.JobStatus; 034import org.apache.hadoop.mapreduce.MRJobConfig; 035import org.apache.hadoop.mapreduce.OutputCommitter; 036import org.apache.hadoop.mapreduce.TaskAttemptContext; 037import org.apache.hadoop.mapreduce.TaskAttemptID; 038 039/** An {@link OutputCommitter} that commits files specified 040 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. 041 **/ 042@InterfaceAudience.Public 043@InterfaceStability.Stable 044public class FileOutputCommitter extends OutputCommitter { 045 private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class); 046 047 /** 048 * Name of directory where pending data is placed. Data that has not been 049 * committed yet. 050 */ 051 public static final String PENDING_DIR_NAME = "_temporary"; 052 public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; 053 public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 054 "mapreduce.fileoutputcommitter.marksuccessfuljobs"; 055 private Path outputPath = null; 056 private Path workPath = null; 057 058 /** 059 * Create a file output committer 060 * @param outputPath the job's output path, or null if you want the output 061 * committer to act as a noop. 062 * @param context the task's context 063 * @throws IOException 064 */ 065 public FileOutputCommitter(Path outputPath, 066 TaskAttemptContext context) throws IOException { 067 this(outputPath, (JobContext)context); 068 if (outputPath != null) { 069 workPath = getTaskAttemptPath(context, outputPath); 070 } 071 } 072 073 /** 074 * Create a file output committer 075 * @param outputPath the job's output path, or null if you want the output 076 * committer to act as a noop. 077 * @param context the task's context 078 * @throws IOException 079 */ 080 @Private 081 public FileOutputCommitter(Path outputPath, 082 JobContext context) throws IOException { 083 if (outputPath != null) { 084 FileSystem fs = outputPath.getFileSystem(context.getConfiguration()); 085 this.outputPath = fs.makeQualified(outputPath); 086 } 087 } 088 089 /** 090 * @return the path where final output of the job should be placed. This 091 * could also be considered the committed application attempt path. 092 */ 093 private Path getOutputPath() { 094 return this.outputPath; 095 } 096 097 /** 098 * @return true if we have an output path set, else false. 099 */ 100 private boolean hasOutputPath() { 101 return this.outputPath != null; 102 } 103 104 /** 105 * @return the path where the output of pending job attempts are 106 * stored. 107 */ 108 private Path getPendingJobAttemptsPath() { 109 return getPendingJobAttemptsPath(getOutputPath()); 110 } 111 112 /** 113 * Get the location of pending job attempts. 114 * @param out the base output directory. 115 * @return the location of pending job attempts. 116 */ 117 private static Path getPendingJobAttemptsPath(Path out) { 118 return new Path(out, PENDING_DIR_NAME); 119 } 120 121 /** 122 * Get the Application Attempt Id for this job 123 * @param context the context to look in 124 * @return the Application Attempt Id for a given job. 125 */ 126 private static int getAppAttemptId(JobContext context) { 127 return context.getConfiguration().getInt( 128 MRJobConfig.APPLICATION_ATTEMPT_ID, 0); 129 } 130 131 /** 132 * Compute the path where the output of a given job attempt will be placed. 133 * @param context the context of the job. This is used to get the 134 * application attempt id. 135 * @return the path to store job attempt data. 136 */ 137 public Path getJobAttemptPath(JobContext context) { 138 return getJobAttemptPath(context, getOutputPath()); 139 } 140 141 /** 142 * Compute the path where the output of a given job attempt will be placed. 143 * @param context the context of the job. This is used to get the 144 * application attempt id. 145 * @param out the output path to place these in. 146 * @return the path to store job attempt data. 147 */ 148 public static Path getJobAttemptPath(JobContext context, Path out) { 149 return getJobAttemptPath(getAppAttemptId(context), out); 150 } 151 152 /** 153 * Compute the path where the output of a given job attempt will be placed. 154 * @param appAttemptId the ID of the application attempt for this job. 155 * @return the path to store job attempt data. 156 */ 157 private Path getJobAttemptPath(int appAttemptId) { 158 return getJobAttemptPath(appAttemptId, getOutputPath()); 159 } 160 161 /** 162 * Compute the path where the output of a given job attempt will be placed. 163 * @param appAttemptId the ID of the application attempt for this job. 164 * @return the path to store job attempt data. 165 */ 166 private static Path getJobAttemptPath(int appAttemptId, Path out) { 167 return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId)); 168 } 169 170 /** 171 * Compute the path where the output of pending task attempts are stored. 172 * @param context the context of the job with pending tasks. 173 * @return the path where the output of pending task attempts are stored. 174 */ 175 private Path getPendingTaskAttemptsPath(JobContext context) { 176 return getPendingTaskAttemptsPath(context, getOutputPath()); 177 } 178 179 /** 180 * Compute the path where the output of pending task attempts are stored. 181 * @param context the context of the job with pending tasks. 182 * @return the path where the output of pending task attempts are stored. 183 */ 184 private static Path getPendingTaskAttemptsPath(JobContext context, Path out) { 185 return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME); 186 } 187 188 /** 189 * Compute the path where the output of a task attempt is stored until 190 * that task is committed. 191 * 192 * @param context the context of the task attempt. 193 * @return the path where a task attempt should be stored. 194 */ 195 public Path getTaskAttemptPath(TaskAttemptContext context) { 196 return new Path(getPendingTaskAttemptsPath(context), 197 String.valueOf(context.getTaskAttemptID())); 198 } 199 200 /** 201 * Compute the path where the output of a task attempt is stored until 202 * that task is committed. 203 * 204 * @param context the context of the task attempt. 205 * @param out The output path to put things in. 206 * @return the path where a task attempt should be stored. 207 */ 208 public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) { 209 return new Path(getPendingTaskAttemptsPath(context, out), 210 String.valueOf(context.getTaskAttemptID())); 211 } 212 213 /** 214 * Compute the path where the output of a committed task is stored until 215 * the entire job is committed. 216 * @param context the context of the task attempt 217 * @return the path where the output of a committed task is stored until 218 * the entire job is committed. 219 */ 220 public Path getCommittedTaskPath(TaskAttemptContext context) { 221 return getCommittedTaskPath(getAppAttemptId(context), context); 222 } 223 224 public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) { 225 return getCommittedTaskPath(getAppAttemptId(context), context, out); 226 } 227 228 /** 229 * Compute the path where the output of a committed task is stored until the 230 * entire job is committed for a specific application attempt. 231 * @param appAttemptId the id of the application attempt to use 232 * @param context the context of any task. 233 * @return the path where the output of a committed task is stored. 234 */ 235 private Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) { 236 return new Path(getJobAttemptPath(appAttemptId), 237 String.valueOf(context.getTaskAttemptID().getTaskID())); 238 } 239 240 private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) { 241 return new Path(getJobAttemptPath(appAttemptId, out), 242 String.valueOf(context.getTaskAttemptID().getTaskID())); 243 } 244 245 private static class CommittedTaskFilter implements PathFilter { 246 @Override 247 public boolean accept(Path path) { 248 return !PENDING_DIR_NAME.equals(path.getName()); 249 } 250 } 251 252 /** 253 * Get a list of all paths where output from committed tasks are stored. 254 * @param context the context of the current job 255 * @return the list of these Paths/FileStatuses. 256 * @throws IOException 257 */ 258 private FileStatus[] getAllCommittedTaskPaths(JobContext context) 259 throws IOException { 260 Path jobAttemptPath = getJobAttemptPath(context); 261 FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration()); 262 return fs.listStatus(jobAttemptPath, new CommittedTaskFilter()); 263 } 264 265 /** 266 * Get the directory that the task should write results into. 267 * @return the work directory 268 * @throws IOException 269 */ 270 public Path getWorkPath() throws IOException { 271 return workPath; 272 } 273 274 /** 275 * Create the temporary directory that is the root of all of the task 276 * work directories. 277 * @param context the job's context 278 */ 279 public void setupJob(JobContext context) throws IOException { 280 if (hasOutputPath()) { 281 Path jobAttemptPath = getJobAttemptPath(context); 282 FileSystem fs = jobAttemptPath.getFileSystem( 283 context.getConfiguration()); 284 if (!fs.mkdirs(jobAttemptPath)) { 285 LOG.error("Mkdirs failed to create " + jobAttemptPath); 286 } 287 } else { 288 LOG.warn("Output Path is null in setupJob()"); 289 } 290 } 291 292 /** 293 * The job has completed so move all committed tasks to the final output dir. 294 * Delete the temporary directory, including all of the work directories. 295 * Create a _SUCCESS file to make it as successful. 296 * @param context the job's context 297 */ 298 public void commitJob(JobContext context) throws IOException { 299 if (hasOutputPath()) { 300 Path finalOutput = getOutputPath(); 301 FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); 302 for(FileStatus stat: getAllCommittedTaskPaths(context)) { 303 mergePaths(fs, stat, finalOutput); 304 } 305 306 // delete the _temporary folder and create a _done file in the o/p folder 307 cleanupJob(context); 308 // True if the job requires output.dir marked on successful job. 309 // Note that by default it is set to true. 310 if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { 311 Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME); 312 fs.create(markerPath).close(); 313 } 314 } else { 315 LOG.warn("Output Path is null in commitJob()"); 316 } 317 } 318 319 /** 320 * Merge two paths together. Anything in from will be moved into to, if there 321 * are any name conflicts while merging the files or directories in from win. 322 * @param fs the File System to use 323 * @param from the path data is coming from. 324 * @param to the path data is going to. 325 * @throws IOException on any error 326 */ 327 private static void mergePaths(FileSystem fs, final FileStatus from, 328 final Path to) 329 throws IOException { 330 LOG.debug("Merging data from "+from+" to "+to); 331 if(from.isFile()) { 332 if(fs.exists(to)) { 333 if(!fs.delete(to, true)) { 334 throw new IOException("Failed to delete "+to); 335 } 336 } 337 338 if(!fs.rename(from.getPath(), to)) { 339 throw new IOException("Failed to rename "+from+" to "+to); 340 } 341 } else if(from.isDirectory()) { 342 if(fs.exists(to)) { 343 FileStatus toStat = fs.getFileStatus(to); 344 if(!toStat.isDirectory()) { 345 if(!fs.delete(to, true)) { 346 throw new IOException("Failed to delete "+to); 347 } 348 if(!fs.rename(from.getPath(), to)) { 349 throw new IOException("Failed to rename "+from+" to "+to); 350 } 351 } else { 352 //It is a directory so merge everything in the directories 353 for(FileStatus subFrom: fs.listStatus(from.getPath())) { 354 Path subTo = new Path(to, subFrom.getPath().getName()); 355 mergePaths(fs, subFrom, subTo); 356 } 357 } 358 } else { 359 //it does not exist just rename 360 if(!fs.rename(from.getPath(), to)) { 361 throw new IOException("Failed to rename "+from+" to "+to); 362 } 363 } 364 } 365 } 366 367 @Override 368 @Deprecated 369 public void cleanupJob(JobContext context) throws IOException { 370 if (hasOutputPath()) { 371 Path pendingJobAttemptsPath = getPendingJobAttemptsPath(); 372 FileSystem fs = pendingJobAttemptsPath 373 .getFileSystem(context.getConfiguration()); 374 fs.delete(pendingJobAttemptsPath, true); 375 } else { 376 LOG.warn("Output Path is null in cleanupJob()"); 377 } 378 } 379 380 /** 381 * Delete the temporary directory, including all of the work directories. 382 * @param context the job's context 383 */ 384 @Override 385 public void abortJob(JobContext context, JobStatus.State state) 386 throws IOException { 387 // delete the _temporary folder 388 cleanupJob(context); 389 } 390 391 /** 392 * No task setup required. 393 */ 394 @Override 395 public void setupTask(TaskAttemptContext context) throws IOException { 396 // FileOutputCommitter's setupTask doesn't do anything. Because the 397 // temporary task directory is created on demand when the 398 // task is writing. 399 } 400 401 /** 402 * Move the files from the work directory to the job output directory 403 * @param context the task context 404 */ 405 @Override 406 public void commitTask(TaskAttemptContext context) 407 throws IOException { 408 commitTask(context, null); 409 } 410 411 @Private 412 public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 413 throws IOException { 414 TaskAttemptID attemptId = context.getTaskAttemptID(); 415 if (hasOutputPath()) { 416 context.progress(); 417 if(taskAttemptPath == null) { 418 taskAttemptPath = getTaskAttemptPath(context); 419 } 420 Path committedTaskPath = getCommittedTaskPath(context); 421 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 422 if (fs.exists(taskAttemptPath)) { 423 if(fs.exists(committedTaskPath)) { 424 if(!fs.delete(committedTaskPath, true)) { 425 throw new IOException("Could not delete " + committedTaskPath); 426 } 427 } 428 if(!fs.rename(taskAttemptPath, committedTaskPath)) { 429 throw new IOException("Could not rename " + taskAttemptPath + " to " 430 + committedTaskPath); 431 } 432 LOG.info("Saved output of task '" + attemptId + "' to " + 433 committedTaskPath); 434 } else { 435 LOG.warn("No Output found for " + attemptId); 436 } 437 } else { 438 LOG.warn("Output Path is null in commitTask()"); 439 } 440 } 441 442 /** 443 * Delete the work directory 444 * @throws IOException 445 */ 446 @Override 447 public void abortTask(TaskAttemptContext context) throws IOException { 448 abortTask(context, null); 449 } 450 451 @Private 452 public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException { 453 if (hasOutputPath()) { 454 context.progress(); 455 if(taskAttemptPath == null) { 456 taskAttemptPath = getTaskAttemptPath(context); 457 } 458 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 459 if(!fs.delete(taskAttemptPath, true)) { 460 LOG.warn("Could not delete "+taskAttemptPath); 461 } 462 } else { 463 LOG.warn("Output Path is null in abortTask()"); 464 } 465 } 466 467 /** 468 * Did this task write any files in the work directory? 469 * @param context the task's context 470 */ 471 @Override 472 public boolean needsTaskCommit(TaskAttemptContext context 473 ) throws IOException { 474 return needsTaskCommit(context, null); 475 } 476 477 @Private 478 public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath 479 ) throws IOException { 480 if(hasOutputPath()) { 481 if(taskAttemptPath == null) { 482 taskAttemptPath = getTaskAttemptPath(context); 483 } 484 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); 485 return fs.exists(taskAttemptPath); 486 } 487 return false; 488 } 489 490 @Override 491 public boolean isRecoverySupported() { 492 return true; 493 } 494 495 @Override 496 public void recoverTask(TaskAttemptContext context) 497 throws IOException { 498 if(hasOutputPath()) { 499 context.progress(); 500 TaskAttemptID attemptId = context.getTaskAttemptID(); 501 int previousAttempt = getAppAttemptId(context) - 1; 502 if (previousAttempt < 0) { 503 throw new IOException ("Cannot recover task output for first attempt..."); 504 } 505 506 Path committedTaskPath = getCommittedTaskPath(context); 507 Path previousCommittedTaskPath = getCommittedTaskPath( 508 previousAttempt, context); 509 FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration()); 510 511 LOG.debug("Trying to recover task from " + previousCommittedTaskPath 512 + " into " + committedTaskPath); 513 if (fs.exists(previousCommittedTaskPath)) { 514 if(fs.exists(committedTaskPath)) { 515 if(!fs.delete(committedTaskPath, true)) { 516 throw new IOException("Could not delete "+committedTaskPath); 517 } 518 } 519 //Rename can fail if the parent directory does not yet exist. 520 Path committedParent = committedTaskPath.getParent(); 521 fs.mkdirs(committedParent); 522 if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) { 523 throw new IOException("Could not rename " + previousCommittedTaskPath + 524 " to " + committedTaskPath); 525 } 526 LOG.info("Saved output of " + attemptId + " to " + committedTaskPath); 527 } else { 528 LOG.warn(attemptId+" had no output to recover."); 529 } 530 } else { 531 LOG.warn("Output Path is null in recoverTask()"); 532 } 533 } 534}