001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.output;
020
021import java.io.IOException;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.mapreduce.JobContext;
031import org.apache.hadoop.mapreduce.OutputCommitter;
032import org.apache.hadoop.mapreduce.TaskAttemptContext;
033import org.apache.hadoop.mapreduce.TaskAttemptID;
034import org.apache.hadoop.mapreduce.TaskID;
035import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;
036
037import com.google.common.annotations.VisibleForTesting;
038
039/** An {@link OutputCommitter} that commits files specified
040 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
041 **/
042@Checkpointable
043@InterfaceAudience.Public
044@InterfaceStability.Evolving
045public class PartialFileOutputCommitter
046    extends FileOutputCommitter implements PartialOutputCommitter {
047
048  private static final Log LOG =
049    LogFactory.getLog(PartialFileOutputCommitter.class);
050
051
052  public PartialFileOutputCommitter(Path outputPath,
053                             TaskAttemptContext context) throws IOException {
054    super(outputPath, context);
055  }
056
057  public PartialFileOutputCommitter(Path outputPath,
058                             JobContext context) throws IOException {
059    super(outputPath, context);
060  }
061
062  @Override
063  public Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
064    return new Path(getJobAttemptPath(appAttemptId),
065        String.valueOf(context.getTaskAttemptID()));
066  }
067
068  @VisibleForTesting
069  FileSystem fsFor(Path p, Configuration conf) throws IOException {
070    return p.getFileSystem(conf);
071  }
072
073  @Override
074  public void cleanUpPartialOutputForTask(TaskAttemptContext context)
075      throws IOException {
076
077    // we double check this is never invoked from a non-preemptable subclass.
078    // This should never happen, since the invoking codes is checking it too,
079    // but it is safer to double check. Errors handling this would produce
080    // inconsistent output.
081
082    if (!this.getClass().isAnnotationPresent(Checkpointable.class)) {
083      throw new IllegalStateException("Invoking cleanUpPartialOutputForTask() " +
084          "from non @Preemptable class");
085    }
086    FileSystem fs =
087      fsFor(getTaskAttemptPath(context), context.getConfiguration());
088
089    LOG.info("cleanUpPartialOutputForTask: removing everything belonging to " +
090        context.getTaskAttemptID().getTaskID() + " in: " +
091        getCommittedTaskPath(context).getParent());
092
093    final TaskAttemptID taid = context.getTaskAttemptID();
094    final TaskID tid = taid.getTaskID();
095    Path pCommit = getCommittedTaskPath(context).getParent();
096    // remove any committed output
097    for (int i = 0; i < taid.getId(); ++i) {
098      TaskAttemptID oldId = new TaskAttemptID(tid, i);
099      Path pTask = new Path(pCommit, oldId.toString());
100      if (fs.exists(pTask) && !fs.delete(pTask, true)) {
101        throw new IOException("Failed to delete " + pTask);
102      }
103    }
104  }
105
106}