001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.output;
020
021 import java.io.IOException;
022
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.hadoop.fs.FileSystem;
029 import org.apache.hadoop.fs.Path;
030 import org.apache.hadoop.mapreduce.JobContext;
031 import org.apache.hadoop.mapreduce.OutputCommitter;
032 import org.apache.hadoop.mapreduce.TaskAttemptContext;
033 import org.apache.hadoop.mapreduce.TaskAttemptID;
034 import org.apache.hadoop.mapreduce.TaskID;
035 import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;
036
037 import com.google.common.annotations.VisibleForTesting;
038
039 /** An {@link OutputCommitter} that commits files specified
040 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
041 **/
042 @Checkpointable
043 @InterfaceAudience.Public
044 @InterfaceStability.Evolving
045 public class PartialFileOutputCommitter
046 extends FileOutputCommitter implements PartialOutputCommitter {
047
048 private static final Log LOG =
049 LogFactory.getLog(PartialFileOutputCommitter.class);
050
051
052 public PartialFileOutputCommitter(Path outputPath,
053 TaskAttemptContext context) throws IOException {
054 super(outputPath, context);
055 }
056
057 public PartialFileOutputCommitter(Path outputPath,
058 JobContext context) throws IOException {
059 super(outputPath, context);
060 }
061
062 @Override
063 public Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
064 return new Path(getJobAttemptPath(appAttemptId),
065 String.valueOf(context.getTaskAttemptID()));
066 }
067
068 @VisibleForTesting
069 FileSystem fsFor(Path p, Configuration conf) throws IOException {
070 return p.getFileSystem(conf);
071 }
072
073 @Override
074 public void cleanUpPartialOutputForTask(TaskAttemptContext context)
075 throws IOException {
076
077 // we double check this is never invoked from a non-preemptable subclass.
078 // This should never happen, since the invoking codes is checking it too,
079 // but it is safer to double check. Errors handling this would produce
080 // inconsistent output.
081
082 if (!this.getClass().isAnnotationPresent(Checkpointable.class)) {
083 throw new IllegalStateException("Invoking cleanUpPartialOutputForTask() " +
084 "from non @Preemptable class");
085 }
086 FileSystem fs =
087 fsFor(getTaskAttemptPath(context), context.getConfiguration());
088
089 LOG.info("cleanUpPartialOutputForTask: removing everything belonging to " +
090 context.getTaskAttemptID().getTaskID() + " in: " +
091 getCommittedTaskPath(context).getParent());
092
093 final TaskAttemptID taid = context.getTaskAttemptID();
094 final TaskID tid = taid.getTaskID();
095 Path pCommit = getCommittedTaskPath(context).getParent();
096 // remove any committed output
097 for (int i = 0; i < taid.getId(); ++i) {
098 TaskAttemptID oldId = new TaskAttemptID(tid, i);
099 Path pTask = new Path(pCommit, oldId.toString());
100 if (fs.exists(pTask) && !fs.delete(pTask, true)) {
101 throw new IOException("Failed to delete " + pTask);
102 }
103 }
104 }
105
106 }