001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    
023    import org.apache.commons.logging.Log;
024    import org.apache.commons.logging.LogFactory;
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceAudience.Private;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.fs.Path;
029    
030    /** An {@link OutputCommitter} that commits files specified 
031     * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. 
032     **/
033    @InterfaceAudience.Public
034    @InterfaceStability.Stable
035    public class FileOutputCommitter extends OutputCommitter {
036    
037      public static final Log LOG = LogFactory.getLog(
038          "org.apache.hadoop.mapred.FileOutputCommitter");
039      
040      /**
041       * Temporary directory name 
042       */
043      public static final String TEMP_DIR_NAME = 
044        org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.PENDING_DIR_NAME;
045      public static final String SUCCEEDED_FILE_NAME = 
046        org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCEEDED_FILE_NAME;
047      static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
048        org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
049      
050      private static Path getOutputPath(JobContext context) {
051        JobConf conf = context.getJobConf();
052        return FileOutputFormat.getOutputPath(conf);
053      }
054      
055      private static Path getOutputPath(TaskAttemptContext context) {
056        JobConf conf = context.getJobConf();
057        return FileOutputFormat.getOutputPath(conf);
058      }
059      
060      private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter wrapped = null;
061      
062      private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 
063      getWrapped(JobContext context) throws IOException {
064        if(wrapped == null) {
065          wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(
066              getOutputPath(context), context);
067        }
068        return wrapped;
069      }
070      
071      private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 
072      getWrapped(TaskAttemptContext context) throws IOException {
073        if(wrapped == null) {
074          wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(
075              getOutputPath(context), context);
076        }
077        return wrapped;
078      }
079      
080      /**
081       * Compute the path where the output of a given job attempt will be placed. 
082       * @param context the context of the job.  This is used to get the
083       * application attempt id.
084       * @return the path to store job attempt data.
085       */
086      @Private
087      Path getJobAttemptPath(JobContext context) {
088        Path out = getOutputPath(context);
089        return out == null ? null : 
090          org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
091            .getJobAttemptPath(context, out);
092      }
093    
094      @Private
095      public Path getTaskAttemptPath(TaskAttemptContext context) throws IOException {
096        Path out = getOutputPath(context);
097        return out == null ? null : getTaskAttemptPath(context, out);
098      }
099    
100      private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException {
101        Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf());
102        if(workPath == null && out != null) {
103          return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
104          .getTaskAttemptPath(context, out);
105        }
106        return workPath;
107      }
108      
109      /**
110       * Compute the path where the output of a committed task is stored until
111       * the entire job is committed.
112       * @param context the context of the task attempt
113       * @return the path where the output of a committed task is stored until
114       * the entire job is committed.
115       */
116      @Private
117      Path getCommittedTaskPath(TaskAttemptContext context) {
118        Path out = getOutputPath(context);
119        return out == null ? null : 
120          org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
121            .getCommittedTaskPath(context, out);
122      }
123    
124      public Path getWorkPath(TaskAttemptContext context, Path outputPath) 
125      throws IOException {
126        return outputPath == null ? null : getTaskAttemptPath(context, outputPath);
127      }
128      
129      @Override
130      public void setupJob(JobContext context) throws IOException {
131        getWrapped(context).setupJob(context);
132      }
133      
134      @Override
135      public void commitJob(JobContext context) throws IOException {
136        getWrapped(context).commitJob(context);
137      }
138      
139      @Override
140      @Deprecated
141      public void cleanupJob(JobContext context) throws IOException {
142        getWrapped(context).cleanupJob(context);
143      }
144    
145      @Override
146      public void abortJob(JobContext context, int runState) 
147      throws IOException {
148        JobStatus.State state;
149        if(runState == JobStatus.State.RUNNING.getValue()) {
150          state = JobStatus.State.RUNNING;
151        } else if(runState == JobStatus.State.SUCCEEDED.getValue()) {
152          state = JobStatus.State.SUCCEEDED;
153        } else if(runState == JobStatus.State.FAILED.getValue()) {
154          state = JobStatus.State.FAILED;
155        } else if(runState == JobStatus.State.PREP.getValue()) {
156          state = JobStatus.State.PREP;
157        } else if(runState == JobStatus.State.KILLED.getValue()) {
158          state = JobStatus.State.KILLED;
159        } else {
160          throw new IllegalArgumentException(runState+" is not a valid runState.");
161        }
162        getWrapped(context).abortJob(context, state);
163      }
164      
165      @Override
166      public void setupTask(TaskAttemptContext context) throws IOException {
167        getWrapped(context).setupTask(context);
168      }
169      
170      @Override
171      public void commitTask(TaskAttemptContext context) throws IOException {
172        getWrapped(context).commitTask(context, getTaskAttemptPath(context));
173      }
174    
175      @Override
176      public void abortTask(TaskAttemptContext context) throws IOException {
177        getWrapped(context).abortTask(context, getTaskAttemptPath(context));
178      }
179    
180      @Override
181      public boolean needsTaskCommit(TaskAttemptContext context) 
182      throws IOException {
183        return getWrapped(context).needsTaskCommit(context, getTaskAttemptPath(context));
184      }
185      
186      @Override
187      @Deprecated
188      public boolean isRecoverySupported() {
189        return true;
190      }
191    
192      @Override
193      public boolean isRecoverySupported(JobContext context) throws IOException {
194        return getWrapped(context).isRecoverySupported(context);
195      }
196    
197      @Override
198      public void recoverTask(TaskAttemptContext context)
199          throws IOException {
200        getWrapped(context).recoverTask(context);
201      }
202    }