001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 import java.io.IOException; 022 023 import org.apache.commons.logging.Log; 024 import org.apache.commons.logging.LogFactory; 025 import org.apache.hadoop.classification.InterfaceAudience; 026 import org.apache.hadoop.classification.InterfaceAudience.Private; 027 import org.apache.hadoop.classification.InterfaceStability; 028 import org.apache.hadoop.fs.Path; 029 030 /** An {@link OutputCommitter} that commits files specified 031 * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. 032 **/ 033 @InterfaceAudience.Public 034 @InterfaceStability.Stable 035 public class FileOutputCommitter extends OutputCommitter { 036 037 public static final Log LOG = LogFactory.getLog( 038 "org.apache.hadoop.mapred.FileOutputCommitter"); 039 040 /** 041 * Temporary directory name 042 */ 043 public static final String TEMP_DIR_NAME = 044 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.PENDING_DIR_NAME; 045 public static final String SUCCEEDED_FILE_NAME = 046 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCEEDED_FILE_NAME; 047 static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 048 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER; 049 050 private static Path getOutputPath(JobContext context) { 051 JobConf conf = context.getJobConf(); 052 return FileOutputFormat.getOutputPath(conf); 053 } 054 055 private static Path getOutputPath(TaskAttemptContext context) { 056 JobConf conf = context.getJobConf(); 057 return FileOutputFormat.getOutputPath(conf); 058 } 059 060 private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter wrapped = null; 061 062 private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 063 getWrapped(JobContext context) throws IOException { 064 if(wrapped == null) { 065 wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter( 066 getOutputPath(context), context); 067 } 068 return wrapped; 069 } 070 071 private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 072 getWrapped(TaskAttemptContext context) throws IOException { 073 if(wrapped == null) { 074 wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter( 075 getOutputPath(context), context); 076 } 077 return wrapped; 078 } 079 080 /** 081 * Compute the path where the output of a given job attempt will be placed. 082 * @param context the context of the job. This is used to get the 083 * application attempt id. 084 * @return the path to store job attempt data. 085 */ 086 @Private 087 Path getJobAttemptPath(JobContext context) { 088 Path out = getOutputPath(context); 089 return out == null ? null : 090 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 091 .getJobAttemptPath(context, out); 092 } 093 094 @Private 095 public Path getTaskAttemptPath(TaskAttemptContext context) throws IOException { 096 Path out = getOutputPath(context); 097 return out == null ? null : getTaskAttemptPath(context, out); 098 } 099 100 private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException { 101 Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf()); 102 if(workPath == null && out != null) { 103 return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 104 .getTaskAttemptPath(context, out); 105 } 106 return workPath; 107 } 108 109 /** 110 * Compute the path where the output of a committed task is stored until 111 * the entire job is committed. 112 * @param context the context of the task attempt 113 * @return the path where the output of a committed task is stored until 114 * the entire job is committed. 115 */ 116 @Private 117 Path getCommittedTaskPath(TaskAttemptContext context) { 118 Path out = getOutputPath(context); 119 return out == null ? null : 120 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 121 .getCommittedTaskPath(context, out); 122 } 123 124 public Path getWorkPath(TaskAttemptContext context, Path outputPath) 125 throws IOException { 126 return outputPath == null ? null : getTaskAttemptPath(context, outputPath); 127 } 128 129 @Override 130 public void setupJob(JobContext context) throws IOException { 131 getWrapped(context).setupJob(context); 132 } 133 134 @Override 135 public void commitJob(JobContext context) throws IOException { 136 getWrapped(context).commitJob(context); 137 } 138 139 @Override 140 @Deprecated 141 public void cleanupJob(JobContext context) throws IOException { 142 getWrapped(context).cleanupJob(context); 143 } 144 145 @Override 146 public void abortJob(JobContext context, int runState) 147 throws IOException { 148 JobStatus.State state; 149 if(runState == JobStatus.State.RUNNING.getValue()) { 150 state = JobStatus.State.RUNNING; 151 } else if(runState == JobStatus.State.SUCCEEDED.getValue()) { 152 state = JobStatus.State.SUCCEEDED; 153 } else if(runState == JobStatus.State.FAILED.getValue()) { 154 state = JobStatus.State.FAILED; 155 } else if(runState == JobStatus.State.PREP.getValue()) { 156 state = JobStatus.State.PREP; 157 } else if(runState == JobStatus.State.KILLED.getValue()) { 158 state = JobStatus.State.KILLED; 159 } else { 160 throw new IllegalArgumentException(runState+" is not a valid runState."); 161 } 162 getWrapped(context).abortJob(context, state); 163 } 164 165 @Override 166 public void setupTask(TaskAttemptContext context) throws IOException { 167 getWrapped(context).setupTask(context); 168 } 169 170 @Override 171 public void commitTask(TaskAttemptContext context) throws IOException { 172 getWrapped(context).commitTask(context, getTaskAttemptPath(context)); 173 } 174 175 @Override 176 public void abortTask(TaskAttemptContext context) throws IOException { 177 getWrapped(context).abortTask(context, getTaskAttemptPath(context)); 178 } 179 180 @Override 181 public boolean needsTaskCommit(TaskAttemptContext context) 182 throws IOException { 183 return getWrapped(context).needsTaskCommit(context, getTaskAttemptPath(context)); 184 } 185 186 @Override 187 public boolean isRecoverySupported() { 188 return true; 189 } 190 191 @Override 192 public void recoverTask(TaskAttemptContext context) 193 throws IOException { 194 getWrapped(context).recoverTask(context); 195 } 196 }