001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.output;
020
021 import java.io.IOException;
022
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceAudience.Private;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.fs.FileStatus;
029 import org.apache.hadoop.fs.FileSystem;
030 import org.apache.hadoop.fs.Path;
031 import org.apache.hadoop.fs.PathFilter;
032 import org.apache.hadoop.mapreduce.JobContext;
033 import org.apache.hadoop.mapreduce.JobStatus;
034 import org.apache.hadoop.mapreduce.MRJobConfig;
035 import org.apache.hadoop.mapreduce.OutputCommitter;
036 import org.apache.hadoop.mapreduce.TaskAttemptContext;
037 import org.apache.hadoop.mapreduce.TaskAttemptID;
038
039 /** An {@link OutputCommitter} that commits files specified
040 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
041 **/
042 @InterfaceAudience.Public
043 @InterfaceStability.Stable
044 public class FileOutputCommitter extends OutputCommitter {
045 private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
046
047 /**
048 * Name of directory where pending data is placed. Data that has not been
049 * committed yet.
050 */
051 public static final String PENDING_DIR_NAME = "_temporary";
052 public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
053 public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
054 "mapreduce.fileoutputcommitter.marksuccessfuljobs";
055 private Path outputPath = null;
056 private Path workPath = null;
057
058 /**
059 * Create a file output committer
060 * @param outputPath the job's output path, or null if you want the output
061 * committer to act as a noop.
062 * @param context the task's context
063 * @throws IOException
064 */
065 public FileOutputCommitter(Path outputPath,
066 TaskAttemptContext context) throws IOException {
067 this(outputPath, (JobContext)context);
068 if (outputPath != null) {
069 workPath = getTaskAttemptPath(context, outputPath);
070 }
071 }
072
073 /**
074 * Create a file output committer
075 * @param outputPath the job's output path, or null if you want the output
076 * committer to act as a noop.
077 * @param context the task's context
078 * @throws IOException
079 */
080 @Private
081 public FileOutputCommitter(Path outputPath,
082 JobContext context) throws IOException {
083 if (outputPath != null) {
084 FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
085 this.outputPath = fs.makeQualified(outputPath);
086 }
087 }
088
089 /**
090 * @return the path where final output of the job should be placed. This
091 * could also be considered the committed application attempt path.
092 */
093 private Path getOutputPath() {
094 return this.outputPath;
095 }
096
097 /**
098 * @return true if we have an output path set, else false.
099 */
100 private boolean hasOutputPath() {
101 return this.outputPath != null;
102 }
103
104 /**
105 * @return the path where the output of pending job attempts are
106 * stored.
107 */
108 private Path getPendingJobAttemptsPath() {
109 return getPendingJobAttemptsPath(getOutputPath());
110 }
111
112 /**
113 * Get the location of pending job attempts.
114 * @param out the base output directory.
115 * @return the location of pending job attempts.
116 */
117 private static Path getPendingJobAttemptsPath(Path out) {
118 return new Path(out, PENDING_DIR_NAME);
119 }
120
121 /**
122 * Get the Application Attempt Id for this job
123 * @param context the context to look in
124 * @return the Application Attempt Id for a given job.
125 */
126 private static int getAppAttemptId(JobContext context) {
127 return context.getConfiguration().getInt(
128 MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
129 }
130
131 /**
132 * Compute the path where the output of a given job attempt will be placed.
133 * @param context the context of the job. This is used to get the
134 * application attempt id.
135 * @return the path to store job attempt data.
136 */
137 public Path getJobAttemptPath(JobContext context) {
138 return getJobAttemptPath(context, getOutputPath());
139 }
140
141 /**
142 * Compute the path where the output of a given job attempt will be placed.
143 * @param context the context of the job. This is used to get the
144 * application attempt id.
145 * @param out the output path to place these in.
146 * @return the path to store job attempt data.
147 */
148 public static Path getJobAttemptPath(JobContext context, Path out) {
149 return getJobAttemptPath(getAppAttemptId(context), out);
150 }
151
152 /**
153 * Compute the path where the output of a given job attempt will be placed.
154 * @param appAttemptId the ID of the application attempt for this job.
155 * @return the path to store job attempt data.
156 */
157 private Path getJobAttemptPath(int appAttemptId) {
158 return getJobAttemptPath(appAttemptId, getOutputPath());
159 }
160
161 /**
162 * Compute the path where the output of a given job attempt will be placed.
163 * @param appAttemptId the ID of the application attempt for this job.
164 * @return the path to store job attempt data.
165 */
166 private static Path getJobAttemptPath(int appAttemptId, Path out) {
167 return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
168 }
169
170 /**
171 * Compute the path where the output of pending task attempts are stored.
172 * @param context the context of the job with pending tasks.
173 * @return the path where the output of pending task attempts are stored.
174 */
175 private Path getPendingTaskAttemptsPath(JobContext context) {
176 return getPendingTaskAttemptsPath(context, getOutputPath());
177 }
178
179 /**
180 * Compute the path where the output of pending task attempts are stored.
181 * @param context the context of the job with pending tasks.
182 * @return the path where the output of pending task attempts are stored.
183 */
184 private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
185 return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
186 }
187
188 /**
189 * Compute the path where the output of a task attempt is stored until
190 * that task is committed.
191 *
192 * @param context the context of the task attempt.
193 * @return the path where a task attempt should be stored.
194 */
195 public Path getTaskAttemptPath(TaskAttemptContext context) {
196 return new Path(getPendingTaskAttemptsPath(context),
197 String.valueOf(context.getTaskAttemptID()));
198 }
199
200 /**
201 * Compute the path where the output of a task attempt is stored until
202 * that task is committed.
203 *
204 * @param context the context of the task attempt.
205 * @param out The output path to put things in.
206 * @return the path where a task attempt should be stored.
207 */
208 public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
209 return new Path(getPendingTaskAttemptsPath(context, out),
210 String.valueOf(context.getTaskAttemptID()));
211 }
212
213 /**
214 * Compute the path where the output of a committed task is stored until
215 * the entire job is committed.
216 * @param context the context of the task attempt
217 * @return the path where the output of a committed task is stored until
218 * the entire job is committed.
219 */
220 public Path getCommittedTaskPath(TaskAttemptContext context) {
221 return getCommittedTaskPath(getAppAttemptId(context), context);
222 }
223
224 public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
225 return getCommittedTaskPath(getAppAttemptId(context), context, out);
226 }
227
228 /**
229 * Compute the path where the output of a committed task is stored until the
230 * entire job is committed for a specific application attempt.
231 * @param appAttemptId the id of the application attempt to use
232 * @param context the context of any task.
233 * @return the path where the output of a committed task is stored.
234 */
235 private Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
236 return new Path(getJobAttemptPath(appAttemptId),
237 String.valueOf(context.getTaskAttemptID().getTaskID()));
238 }
239
240 private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
241 return new Path(getJobAttemptPath(appAttemptId, out),
242 String.valueOf(context.getTaskAttemptID().getTaskID()));
243 }
244
245 private static class CommittedTaskFilter implements PathFilter {
246 @Override
247 public boolean accept(Path path) {
248 return !PENDING_DIR_NAME.equals(path.getName());
249 }
250 }
251
252 /**
253 * Get a list of all paths where output from committed tasks are stored.
254 * @param context the context of the current job
255 * @return the list of these Paths/FileStatuses.
256 * @throws IOException
257 */
258 private FileStatus[] getAllCommittedTaskPaths(JobContext context)
259 throws IOException {
260 Path jobAttemptPath = getJobAttemptPath(context);
261 FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
262 return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
263 }
264
265 /**
266 * Get the directory that the task should write results into.
267 * @return the work directory
268 * @throws IOException
269 */
270 public Path getWorkPath() throws IOException {
271 return workPath;
272 }
273
274 /**
275 * Create the temporary directory that is the root of all of the task
276 * work directories.
277 * @param context the job's context
278 */
279 public void setupJob(JobContext context) throws IOException {
280 if (hasOutputPath()) {
281 Path jobAttemptPath = getJobAttemptPath(context);
282 FileSystem fs = jobAttemptPath.getFileSystem(
283 context.getConfiguration());
284 if (!fs.mkdirs(jobAttemptPath)) {
285 LOG.error("Mkdirs failed to create " + jobAttemptPath);
286 }
287 } else {
288 LOG.warn("Output Path is null in setupJob()");
289 }
290 }
291
292 /**
293 * The job has completed so move all committed tasks to the final output dir.
294 * Delete the temporary directory, including all of the work directories.
295 * Create a _SUCCESS file to make it as successful.
296 * @param context the job's context
297 */
298 public void commitJob(JobContext context) throws IOException {
299 if (hasOutputPath()) {
300 Path finalOutput = getOutputPath();
301 FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
302 for(FileStatus stat: getAllCommittedTaskPaths(context)) {
303 mergePaths(fs, stat, finalOutput);
304 }
305
306 // delete the _temporary folder and create a _done file in the o/p folder
307 cleanupJob(context);
308 // True if the job requires output.dir marked on successful job.
309 // Note that by default it is set to true.
310 if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
311 Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
312 fs.create(markerPath).close();
313 }
314 } else {
315 LOG.warn("Output Path is null in commitJob()");
316 }
317 }
318
319 /**
320 * Merge two paths together. Anything in from will be moved into to, if there
321 * are any name conflicts while merging the files or directories in from win.
322 * @param fs the File System to use
323 * @param from the path data is coming from.
324 * @param to the path data is going to.
325 * @throws IOException on any error
326 */
327 private static void mergePaths(FileSystem fs, final FileStatus from,
328 final Path to)
329 throws IOException {
330 LOG.debug("Merging data from "+from+" to "+to);
331 if(from.isFile()) {
332 if(fs.exists(to)) {
333 if(!fs.delete(to, true)) {
334 throw new IOException("Failed to delete "+to);
335 }
336 }
337
338 if(!fs.rename(from.getPath(), to)) {
339 throw new IOException("Failed to rename "+from+" to "+to);
340 }
341 } else if(from.isDirectory()) {
342 if(fs.exists(to)) {
343 FileStatus toStat = fs.getFileStatus(to);
344 if(!toStat.isDirectory()) {
345 if(!fs.delete(to, true)) {
346 throw new IOException("Failed to delete "+to);
347 }
348 if(!fs.rename(from.getPath(), to)) {
349 throw new IOException("Failed to rename "+from+" to "+to);
350 }
351 } else {
352 //It is a directory so merge everything in the directories
353 for(FileStatus subFrom: fs.listStatus(from.getPath())) {
354 Path subTo = new Path(to, subFrom.getPath().getName());
355 mergePaths(fs, subFrom, subTo);
356 }
357 }
358 } else {
359 //it does not exist just rename
360 if(!fs.rename(from.getPath(), to)) {
361 throw new IOException("Failed to rename "+from+" to "+to);
362 }
363 }
364 }
365 }
366
367 @Override
368 @Deprecated
369 public void cleanupJob(JobContext context) throws IOException {
370 if (hasOutputPath()) {
371 Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
372 FileSystem fs = pendingJobAttemptsPath
373 .getFileSystem(context.getConfiguration());
374 fs.delete(pendingJobAttemptsPath, true);
375 } else {
376 LOG.warn("Output Path is null in cleanupJob()");
377 }
378 }
379
380 /**
381 * Delete the temporary directory, including all of the work directories.
382 * @param context the job's context
383 */
384 @Override
385 public void abortJob(JobContext context, JobStatus.State state)
386 throws IOException {
387 // delete the _temporary folder
388 cleanupJob(context);
389 }
390
391 /**
392 * No task setup required.
393 */
394 @Override
395 public void setupTask(TaskAttemptContext context) throws IOException {
396 // FileOutputCommitter's setupTask doesn't do anything. Because the
397 // temporary task directory is created on demand when the
398 // task is writing.
399 }
400
401 /**
402 * Move the files from the work directory to the job output directory
403 * @param context the task context
404 */
405 @Override
406 public void commitTask(TaskAttemptContext context)
407 throws IOException {
408 commitTask(context, null);
409 }
410
411 @Private
412 public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
413 throws IOException {
414 TaskAttemptID attemptId = context.getTaskAttemptID();
415 if (hasOutputPath()) {
416 context.progress();
417 if(taskAttemptPath == null) {
418 taskAttemptPath = getTaskAttemptPath(context);
419 }
420 Path committedTaskPath = getCommittedTaskPath(context);
421 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
422 if (fs.exists(taskAttemptPath)) {
423 if(fs.exists(committedTaskPath)) {
424 if(!fs.delete(committedTaskPath, true)) {
425 throw new IOException("Could not delete " + committedTaskPath);
426 }
427 }
428 if(!fs.rename(taskAttemptPath, committedTaskPath)) {
429 throw new IOException("Could not rename " + taskAttemptPath + " to "
430 + committedTaskPath);
431 }
432 LOG.info("Saved output of task '" + attemptId + "' to " +
433 committedTaskPath);
434 } else {
435 LOG.warn("No Output found for " + attemptId);
436 }
437 } else {
438 LOG.warn("Output Path is null in commitTask()");
439 }
440 }
441
442 /**
443 * Delete the work directory
444 * @throws IOException
445 */
446 @Override
447 public void abortTask(TaskAttemptContext context) throws IOException {
448 abortTask(context, null);
449 }
450
451 @Private
452 public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
453 if (hasOutputPath()) {
454 context.progress();
455 if(taskAttemptPath == null) {
456 taskAttemptPath = getTaskAttemptPath(context);
457 }
458 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
459 if(!fs.delete(taskAttemptPath, true)) {
460 LOG.warn("Could not delete "+taskAttemptPath);
461 }
462 } else {
463 LOG.warn("Output Path is null in abortTask()");
464 }
465 }
466
467 /**
468 * Did this task write any files in the work directory?
469 * @param context the task's context
470 */
471 @Override
472 public boolean needsTaskCommit(TaskAttemptContext context
473 ) throws IOException {
474 return needsTaskCommit(context, null);
475 }
476
477 @Private
478 public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
479 ) throws IOException {
480 if(hasOutputPath()) {
481 if(taskAttemptPath == null) {
482 taskAttemptPath = getTaskAttemptPath(context);
483 }
484 FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
485 return fs.exists(taskAttemptPath);
486 }
487 return false;
488 }
489
490 @Override
491 public boolean isRecoverySupported() {
492 return true;
493 }
494
495 @Override
496 public void recoverTask(TaskAttemptContext context)
497 throws IOException {
498 if(hasOutputPath()) {
499 context.progress();
500 TaskAttemptID attemptId = context.getTaskAttemptID();
501 int previousAttempt = getAppAttemptId(context) - 1;
502 if (previousAttempt < 0) {
503 throw new IOException ("Cannot recover task output for first attempt...");
504 }
505
506 Path committedTaskPath = getCommittedTaskPath(context);
507 Path previousCommittedTaskPath = getCommittedTaskPath(
508 previousAttempt, context);
509 FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
510
511 LOG.debug("Trying to recover task from " + previousCommittedTaskPath
512 + " into " + committedTaskPath);
513 if (fs.exists(previousCommittedTaskPath)) {
514 if(fs.exists(committedTaskPath)) {
515 if(!fs.delete(committedTaskPath, true)) {
516 throw new IOException("Could not delete "+committedTaskPath);
517 }
518 }
519 //Rename can fail if the parent directory does not yet exist.
520 Path committedParent = committedTaskPath.getParent();
521 fs.mkdirs(committedParent);
522 if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
523 throw new IOException("Could not rename " + previousCommittedTaskPath +
524 " to " + committedTaskPath);
525 }
526 LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
527 } else {
528 LOG.warn(attemptId+" had no output to recover.");
529 }
530 } else {
531 LOG.warn("Output Path is null in recoverTask()");
532 }
533 }
534 }