001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.output;
020
021 import java.io.IOException;
022
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceAudience.Private;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.fs.FileStatus;
029 import org.apache.hadoop.fs.FileSystem;
030 import org.apache.hadoop.fs.Path;
031 import org.apache.hadoop.fs.PathFilter;
032 import org.apache.hadoop.mapreduce.JobContext;
033 import org.apache.hadoop.mapreduce.JobStatus;
034 import org.apache.hadoop.mapreduce.MRJobConfig;
035 import org.apache.hadoop.mapreduce.OutputCommitter;
036 import org.apache.hadoop.mapreduce.TaskAttemptContext;
037 import org.apache.hadoop.mapreduce.TaskAttemptID;
038
039 /** An {@link OutputCommitter} that commits files specified
040 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
041 **/
042 @InterfaceAudience.Public
043 @InterfaceStability.Stable
044 public class FileOutputCommitter extends OutputCommitter {
045 private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
046
047 /**
048 * Name of directory where pending data is placed. Data that has not been
049 * committed yet.
050 */
051 public static final String PENDING_DIR_NAME = "_temporary";
052 /**
053 * Temporary directory name
054 *
055 * The static variable to be compatible with M/R 1.x
056 */
057 @Deprecated
058 protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME;
059 public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
060 public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
061 "mapreduce.fileoutputcommitter.marksuccessfuljobs";
062 private Path outputPath = null;
063 private Path workPath = null;
064
065 /**
066 * Create a file output committer
067 * @param outputPath the job's output path, or null if you want the output
068 * committer to act as a noop.
069 * @param context the task's context
070 * @throws IOException
071 */
072 public FileOutputCommitter(Path outputPath,
073 TaskAttemptContext context) throws IOException {
074 this(outputPath, (JobContext)context);
075 if (outputPath != null) {
076 workPath = getTaskAttemptPath(context, outputPath);
077 }
078 }
079
080 /**
081 * Create a file output committer
082 * @param outputPath the job's output path, or null if you want the output
083 * committer to act as a noop.
084 * @param context the task's context
085 * @throws IOException
086 */
087 @Private
088 public FileOutputCommitter(Path outputPath,
089 JobContext context) throws IOException {
090 if (outputPath != null) {
091 FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
092 this.outputPath = fs.makeQualified(outputPath);
093 }
094 }
095
096 /**
097 * @return the path where final output of the job should be placed. This
098 * could also be considered the committed application attempt path.
099 */
100 private Path getOutputPath() {
101 return this.outputPath;
102 }
103
104 /**
105 * @return true if we have an output path set, else false.
106 */
107 private boolean hasOutputPath() {
108 return this.outputPath != null;
109 }
110
111 /**
112 * @return the path where the output of pending job attempts are
113 * stored.
114 */
115 private Path getPendingJobAttemptsPath() {
116 return getPendingJobAttemptsPath(getOutputPath());
117 }
118
119 /**
120 * Get the location of pending job attempts.
121 * @param out the base output directory.
122 * @return the location of pending job attempts.
123 */
124 private static Path getPendingJobAttemptsPath(Path out) {
125 return new Path(out, PENDING_DIR_NAME);
126 }
127
128 /**
129 * Get the Application Attempt Id for this job
130 * @param context the context to look in
131 * @return the Application Attempt Id for a given job.
132 */
133 private static int getAppAttemptId(JobContext context) {
134 return context.getConfiguration().getInt(
135 MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
136 }
137
138 /**
139 * Compute the path where the output of a given job attempt will be placed.
140 * @param context the context of the job. This is used to get the
141 * application attempt id.
142 * @return the path to store job attempt data.
143 */
144 public Path getJobAttemptPath(JobContext context) {
145 return getJobAttemptPath(context, getOutputPath());
146 }
147
148 /**
149 * Compute the path where the output of a given job attempt will be placed.
150 * @param context the context of the job. This is used to get the
151 * application attempt id.
152 * @param out the output path to place these in.
153 * @return the path to store job attempt data.
154 */
155 public static Path getJobAttemptPath(JobContext context, Path out) {
156 return getJobAttemptPath(getAppAttemptId(context), out);
157 }
158
159 /**
160 * Compute the path where the output of a given job attempt will be placed.
161 * @param appAttemptId the ID of the application attempt for this job.
162 * @return the path to store job attempt data.
163 */
164 protected Path getJobAttemptPath(int appAttemptId) {
165 return getJobAttemptPath(appAttemptId, getOutputPath());
166 }
167
168 /**
169 * Compute the path where the output of a given job attempt will be placed.
170 * @param appAttemptId the ID of the application attempt for this job.
171 * @return the path to store job attempt data.
172 */
173 private static Path getJobAttemptPath(int appAttemptId, Path out) {
174 return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
175 }
176
177 /**
178 * Compute the path where the output of pending task attempts are stored.
179 * @param context the context of the job with pending tasks.
180 * @return the path where the output of pending task attempts are stored.
181 */
182 private Path getPendingTaskAttemptsPath(JobContext context) {
183 return getPendingTaskAttemptsPath(context, getOutputPath());
184 }
185
186 /**
187 * Compute the path where the output of pending task attempts are stored.
188 * @param context the context of the job with pending tasks.
189 * @return the path where the output of pending task attempts are stored.
190 */
191 private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
192 return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
193 }
194
195 /**
196 * Compute the path where the output of a task attempt is stored until
197 * that task is committed.
198 *
199 * @param context the context of the task attempt.
200 * @return the path where a task attempt should be stored.
201 */
202 public Path getTaskAttemptPath(TaskAttemptContext context) {
203 return new Path(getPendingTaskAttemptsPath(context),
204 String.valueOf(context.getTaskAttemptID()));
205 }
206
207 /**
208 * Compute the path where the output of a task attempt is stored until
209 * that task is committed.
210 *
211 * @param context the context of the task attempt.
212 * @param out The output path to put things in.
213 * @return the path where a task attempt should be stored.
214 */
215 public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
216 return new Path(getPendingTaskAttemptsPath(context, out),
217 String.valueOf(context.getTaskAttemptID()));
218 }
219
220 /**
221 * Compute the path where the output of a committed task is stored until
222 * the entire job is committed.
223 * @param context the context of the task attempt
224 * @return the path where the output of a committed task is stored until
225 * the entire job is committed.
226 */
227 public Path getCommittedTaskPath(TaskAttemptContext context) {
228 return getCommittedTaskPath(getAppAttemptId(context), context);
229 }
230
231 public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
232 return getCommittedTaskPath(getAppAttemptId(context), context, out);
233 }
234
235 /**
236 * Compute the path where the output of a committed task is stored until the
237 * entire job is committed for a specific application attempt.
238 * @param appAttemptId the id of the application attempt to use
239 * @param context the context of any task.
240 * @return the path where the output of a committed task is stored.
241 */
242 protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
243 return new Path(getJobAttemptPath(appAttemptId),
244 String.valueOf(context.getTaskAttemptID().getTaskID()));
245 }
246
247 private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
248 return new Path(getJobAttemptPath(appAttemptId, out),
249 String.valueOf(context.getTaskAttemptID().getTaskID()));
250 }
251
252 private static class CommittedTaskFilter implements PathFilter {
253 @Override
254 public boolean accept(Path path) {
255 return !PENDING_DIR_NAME.equals(path.getName());
256 }
257 }
258
259 /**
260 * Get a list of all paths where output from committed tasks are stored.
261 * @param context the context of the current job
262 * @return the list of these Paths/FileStatuses.
263 * @throws IOException
264 */
265 private FileStatus[] getAllCommittedTaskPaths(JobContext context)
266 throws IOException {
267 Path jobAttemptPath = getJobAttemptPath(context);
268 FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
269 return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
270 }
271
272 /**
273 * Get the directory that the task should write results into.
274 * @return the work directory
275 * @throws IOException
276 */
277 public Path getWorkPath() throws IOException {
278 return workPath;
279 }
280
281 /**
282 * Create the temporary directory that is the root of all of the task
283 * work directories.
284 * @param context the job's context
285 */
286 public void setupJob(JobContext context) throws IOException {
287 if (hasOutputPath()) {
288 Path jobAttemptPath = getJobAttemptPath(context);
289 FileSystem fs = jobAttemptPath.getFileSystem(
290 context.getConfiguration());
291 if (!fs.mkdirs(jobAttemptPath)) {
292 LOG.error("Mkdirs failed to create " + jobAttemptPath);
293 }
294 } else {
295 LOG.warn("Output Path is null in setupJob()");
296 }
297 }
298
299 /**
300 * The job has completed so move all committed tasks to the final output dir.
301 * Delete the temporary directory, including all of the work directories.
302 * Create a _SUCCESS file to make it as successful.
303 * @param context the job's context
304 */
305 public void commitJob(JobContext context) throws IOException {
306 if (hasOutputPath()) {
307 Path finalOutput = getOutputPath();
308 FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
309 for(FileStatus stat: getAllCommittedTaskPaths(context)) {
310 mergePaths(fs, stat, finalOutput);
311 }
312
313 // delete the _temporary folder and create a _done file in the o/p folder
314 cleanupJob(context);
315 // True if the job requires output.dir marked on successful job.
316 // Note that by default it is set to true.
317 if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
318 Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
319 fs.create(markerPath).close();
320 }
321 } else {
322 LOG.warn("Output Path is null in commitJob()");
323 }
324 }
325
326 /**
327 * Merge two paths together. Anything in from will be moved into to, if there
328 * are any name conflicts while merging the files or directories in from win.
329 * @param fs the File System to use
330 * @param from the path data is coming from.
331 * @param to the path data is going to.
332 * @throws IOException on any error
333 */
334 private static void mergePaths(FileSystem fs, final FileStatus from,
335 final Path to)
336 throws IOException {
337 LOG.debug("Merging data from "+from+" to "+to);
338 if(from.isFile()) {
339 if(fs.exists(to)) {
340 if(!fs.delete(to, true)) {
341 throw new IOException("Failed to delete "+to);
342 }
343 }
344
345 if(!fs.rename(from.getPath(), to)) {
346 throw new IOException("Failed to rename "+from+" to "+to);
347 }
348 } else if(from.isDirectory()) {
349 if(fs.exists(to)) {
350 FileStatus toStat = fs.getFileStatus(to);
351 if(!toStat.isDirectory()) {
352 if(!fs.delete(to, true)) {
353 throw new IOException("Failed to delete "+to);
354 }
355 if(!fs.rename(from.getPath(), to)) {
356 throw new IOException("Failed to rename "+from+" to "+to);
357 }
358 } else {
359 //It is a directory so merge everything in the directories
360 for(FileStatus subFrom: fs.listStatus(from.getPath())) {
361 Path subTo = new Path(to, subFrom.getPath().getName());
362 mergePaths(fs, subFrom, subTo);
363 }
364 }
365 } else {
366 //it does not exist just rename
367 if(!fs.rename(from.getPath(), to)) {
368 throw new IOException("Failed to rename "+from+" to "+to);
369 }
370 }
371 }
372 }
373
374 @Override
375 @Deprecated
376 public void cleanupJob(JobContext context) throws IOException {
377 if (hasOutputPath()) {
378 Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
379 FileSystem fs = pendingJobAttemptsPath
380 .getFileSystem(context.getConfiguration());
381 fs.delete(pendingJobAttemptsPath, true);
382 } else {
383 LOG.warn("Output Path is null in cleanupJob()");
384 }
385 }
386
387 /**
388 * Delete the temporary directory, including all of the work directories.
389 * @param context the job's context
390 */
391 @Override
392 public void abortJob(JobContext context, JobStatus.State state)
393 throws IOException {
394 // delete the _temporary folder
395 cleanupJob(context);
396 }
397
398 /**
399 * No task setup required.
400 */
401 @Override
402 public void setupTask(TaskAttemptContext context) throws IOException {
403 // FileOutputCommitter's setupTask doesn't do anything. Because the
404 // temporary task directory is created on demand when the
405 // task is writing.
406 }
407
408 /**
409 * Move the files from the work directory to the job output directory
410 * @param context the task context
411 */
412 @Override
413 public void commitTask(TaskAttemptContext context)
414 throws IOException {
415 commitTask(context, null);
416 }
417
418 @Private
419 public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
420 throws IOException {
421 TaskAttemptID attemptId = context.getTaskAttemptID();
422 if (hasOutputPath()) {
423 context.progress();
424 if(taskAttemptPath == null) {
425 taskAttemptPath = getTaskAttemptPath(context);
426 }
427 Path committedTaskPath = getCommittedTaskPath(context);
428 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
429 if (fs.exists(taskAttemptPath)) {
430 if(fs.exists(committedTaskPath)) {
431 if(!fs.delete(committedTaskPath, true)) {
432 throw new IOException("Could not delete " + committedTaskPath);
433 }
434 }
435 if(!fs.rename(taskAttemptPath, committedTaskPath)) {
436 throw new IOException("Could not rename " + taskAttemptPath + " to "
437 + committedTaskPath);
438 }
439 LOG.info("Saved output of task '" + attemptId + "' to " +
440 committedTaskPath);
441 } else {
442 LOG.warn("No Output found for " + attemptId);
443 }
444 } else {
445 LOG.warn("Output Path is null in commitTask()");
446 }
447 }
448
449 /**
450 * Delete the work directory
451 * @throws IOException
452 */
453 @Override
454 public void abortTask(TaskAttemptContext context) throws IOException {
455 abortTask(context, null);
456 }
457
458 @Private
459 public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
460 if (hasOutputPath()) {
461 context.progress();
462 if(taskAttemptPath == null) {
463 taskAttemptPath = getTaskAttemptPath(context);
464 }
465 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
466 if(!fs.delete(taskAttemptPath, true)) {
467 LOG.warn("Could not delete "+taskAttemptPath);
468 }
469 } else {
470 LOG.warn("Output Path is null in abortTask()");
471 }
472 }
473
474 /**
475 * Did this task write any files in the work directory?
476 * @param context the task's context
477 */
478 @Override
479 public boolean needsTaskCommit(TaskAttemptContext context
480 ) throws IOException {
481 return needsTaskCommit(context, null);
482 }
483
484 @Private
485 public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
486 ) throws IOException {
487 if(hasOutputPath()) {
488 if(taskAttemptPath == null) {
489 taskAttemptPath = getTaskAttemptPath(context);
490 }
491 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
492 return fs.exists(taskAttemptPath);
493 }
494 return false;
495 }
496
497 @Override
498 public boolean isRecoverySupported() {
499 return true;
500 }
501
502 @Override
503 public void recoverTask(TaskAttemptContext context)
504 throws IOException {
505 if(hasOutputPath()) {
506 context.progress();
507 TaskAttemptID attemptId = context.getTaskAttemptID();
508 int previousAttempt = getAppAttemptId(context) - 1;
509 if (previousAttempt < 0) {
510 throw new IOException ("Cannot recover task output for first attempt...");
511 }
512
513 Path committedTaskPath = getCommittedTaskPath(context);
514 Path previousCommittedTaskPath = getCommittedTaskPath(
515 previousAttempt, context);
516 FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
517
518 LOG.debug("Trying to recover task from " + previousCommittedTaskPath
519 + " into " + committedTaskPath);
520 if (fs.exists(previousCommittedTaskPath)) {
521 if(fs.exists(committedTaskPath)) {
522 if(!fs.delete(committedTaskPath, true)) {
523 throw new IOException("Could not delete "+committedTaskPath);
524 }
525 }
526 //Rename can fail if the parent directory does not yet exist.
527 Path committedParent = committedTaskPath.getParent();
528 fs.mkdirs(committedParent);
529 if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
530 throw new IOException("Could not rename " + previousCommittedTaskPath +
531 " to " + committedTaskPath);
532 }
533 LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
534 } else {
535 LOG.warn(attemptId+" had no output to recover.");
536 }
537 } else {
538 LOG.warn("Output Path is null in recoverTask()");
539 }
540 }
541 }