001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.jobcontrol; 020 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.mapreduce.Job; 034import org.apache.hadoop.mapreduce.JobID; 035import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 036import org.apache.hadoop.util.StringUtils; 037 038/** 039 * This class encapsulates a MapReduce job and its dependency. It monitors 040 * the states of the depending jobs and updates the state of this job. 041 * A job starts in the WAITING state. If it does not have any depending jobs, 042 * or all of the depending jobs are in SUCCESS state, then the job state 043 * will become READY. If any depending jobs fail, the job will fail too. 044 * When in READY state, the job can be submitted to Hadoop for execution, with 045 * the state changing into RUNNING state. From RUNNING state, the job 046 * can get into SUCCESS or FAILED state, depending 047 * the status of the job execution. 048 */ 049@InterfaceAudience.Public 050@InterfaceStability.Evolving 051public class ControlledJob { 052 private static final Log LOG = LogFactory.getLog(ControlledJob.class); 053 054 // A job will be in one of the following states 055 public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED, 056 DEPENDENT_FAILED}; 057 public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist"; 058 private State state; 059 private String controlID; // assigned and used by JobControl class 060 private Job job; // mapreduce job to be executed. 061 // some info for human consumption, e.g. the reason why the job failed 062 private String message; 063 // the jobs the current job depends on 064 private List<ControlledJob> dependingJobs; 065 066 /** 067 * Construct a job. 068 * @param job a mapreduce job to be executed. 069 * @param dependingJobs an array of jobs the current job depends on 070 */ 071 public ControlledJob(Job job, List<ControlledJob> dependingJobs) 072 throws IOException { 073 this.job = job; 074 this.dependingJobs = dependingJobs; 075 this.state = State.WAITING; 076 this.controlID = "unassigned"; 077 this.message = "just initialized"; 078 } 079 080 /** 081 * Construct a job. 082 * 083 * @param conf mapred job configuration representing a job to be executed. 084 * @throws IOException 085 */ 086 public ControlledJob(Configuration conf) throws IOException { 087 this(new Job(conf), null); 088 } 089 090 @Override 091 public String toString() { 092 StringBuffer sb = new StringBuffer(); 093 sb.append("job name:\t").append(this.job.getJobName()).append("\n"); 094 sb.append("job id:\t").append(this.controlID).append("\n"); 095 sb.append("job state:\t").append(this.state).append("\n"); 096 sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n"); 097 sb.append("job message:\t").append(this.message).append("\n"); 098 099 if (this.dependingJobs == null || this.dependingJobs.size() == 0) { 100 sb.append("job has no depending job:\t").append("\n"); 101 } else { 102 sb.append("job has ").append(this.dependingJobs.size()). 103 append(" dependeng jobs:\n"); 104 for (int i = 0; i < this.dependingJobs.size(); i++) { 105 sb.append("\t depending job ").append(i).append(":\t"); 106 sb.append((this.dependingJobs.get(i)).getJobName()).append("\n"); 107 } 108 } 109 return sb.toString(); 110 } 111 112 /** 113 * @return the job name of this job 114 */ 115 public String getJobName() { 116 return job.getJobName(); 117 } 118 119 /** 120 * Set the job name for this job. 121 * @param jobName the job name 122 */ 123 public void setJobName(String jobName) { 124 job.setJobName(jobName); 125 } 126 127 /** 128 * @return the job ID of this job assigned by JobControl 129 */ 130 public String getJobID() { 131 return this.controlID; 132 } 133 134 /** 135 * Set the job ID for this job. 136 * @param id the job ID 137 */ 138 public void setJobID(String id) { 139 this.controlID = id; 140 } 141 142 /** 143 * @return the mapred ID of this job as assigned by the 144 * mapred framework. 145 */ 146 public JobID getMapredJobID() { 147 return this.job.getJobID(); 148 } 149 150 /** 151 * @return the mapreduce job 152 */ 153 public synchronized Job getJob() { 154 return this.job; 155 } 156 157 /** 158 * Set the mapreduce job 159 * @param job the mapreduce job for this job. 160 */ 161 public synchronized void setJob(Job job) { 162 this.job = job; 163 } 164 165 /** 166 * @return the state of this job 167 */ 168 public synchronized State getJobState() { 169 return this.state; 170 } 171 172 /** 173 * Set the state for this job. 174 * @param state the new state for this job. 175 */ 176 protected synchronized void setJobState(State state) { 177 this.state = state; 178 } 179 180 /** 181 * @return the message of this job 182 */ 183 public synchronized String getMessage() { 184 return this.message; 185 } 186 187 /** 188 * Set the message for this job. 189 * @param message the message for this job. 190 */ 191 public synchronized void setMessage(String message) { 192 this.message = message; 193 } 194 195 /** 196 * @return the depending jobs of this job 197 */ 198 public List<ControlledJob> getDependentJobs() { 199 return this.dependingJobs; 200 } 201 202 /** 203 * Add a job to this jobs' dependency list. 204 * Dependent jobs can only be added while a Job 205 * is waiting to run, not during or afterwards. 206 * 207 * @param dependingJob Job that this Job depends on. 208 * @return <tt>true</tt> if the Job was added. 209 */ 210 public synchronized boolean addDependingJob(ControlledJob dependingJob) { 211 if (this.state == State.WAITING) { //only allowed to add jobs when waiting 212 if (this.dependingJobs == null) { 213 this.dependingJobs = new ArrayList<ControlledJob>(); 214 } 215 return this.dependingJobs.add(dependingJob); 216 } else { 217 return false; 218 } 219 } 220 221 /** 222 * @return true if this job is in a complete state 223 */ 224 public synchronized boolean isCompleted() { 225 return this.state == State.FAILED || 226 this.state == State.DEPENDENT_FAILED || 227 this.state == State.SUCCESS; 228 } 229 230 /** 231 * @return true if this job is in READY state 232 */ 233 public synchronized boolean isReady() { 234 return this.state == State.READY; 235 } 236 237 public void killJob() throws IOException, InterruptedException { 238 job.killJob(); 239 } 240 241 public synchronized void failJob(String message) throws IOException, InterruptedException { 242 try { 243 if(job != null && this.state == State.RUNNING) { 244 job.killJob(); 245 } 246 } finally { 247 this.state = State.FAILED; 248 this.message = message; 249 } 250 } 251 252 /** 253 * Check the state of this running job. The state may 254 * remain the same, become SUCCESS or FAILED. 255 */ 256 private void checkRunningState() throws IOException, InterruptedException { 257 try { 258 if (job.isComplete()) { 259 if (job.isSuccessful()) { 260 this.state = State.SUCCESS; 261 } else { 262 this.state = State.FAILED; 263 this.message = "Job failed!"; 264 } 265 } 266 } catch (IOException ioe) { 267 this.state = State.FAILED; 268 this.message = StringUtils.stringifyException(ioe); 269 try { 270 if (job != null) { 271 job.killJob(); 272 } 273 } catch (IOException e) {} 274 } 275 } 276 277 /** 278 * Check and update the state of this job. The state changes 279 * depending on its current state and the states of the depending jobs. 280 */ 281 synchronized State checkState() throws IOException, InterruptedException { 282 if (this.state == State.RUNNING) { 283 checkRunningState(); 284 } 285 if (this.state != State.WAITING) { 286 return this.state; 287 } 288 if (this.dependingJobs == null || this.dependingJobs.size() == 0) { 289 this.state = State.READY; 290 return this.state; 291 } 292 ControlledJob pred = null; 293 int n = this.dependingJobs.size(); 294 for (int i = 0; i < n; i++) { 295 pred = this.dependingJobs.get(i); 296 State s = pred.checkState(); 297 if (s == State.WAITING || s == State.READY || s == State.RUNNING) { 298 break; // a pred is still not completed, continue in WAITING 299 // state 300 } 301 if (s == State.FAILED || s == State.DEPENDENT_FAILED) { 302 this.state = State.DEPENDENT_FAILED; 303 this.message = "depending job " + i + " with jobID " 304 + pred.getJobID() + " failed. " + pred.getMessage(); 305 break; 306 } 307 // pred must be in success state 308 if (i == n - 1) { 309 this.state = State.READY; 310 } 311 } 312 313 return this.state; 314 } 315 316 /** 317 * Submit this job to mapred. The state becomes RUNNING if submission 318 * is successful, FAILED otherwise. 319 */ 320 protected synchronized void submit() { 321 try { 322 Configuration conf = job.getConfiguration(); 323 if (conf.getBoolean(CREATE_DIR, false)) { 324 FileSystem fs = FileSystem.get(conf); 325 Path inputPaths[] = FileInputFormat.getInputPaths(job); 326 for (int i = 0; i < inputPaths.length; i++) { 327 if (!fs.exists(inputPaths[i])) { 328 try { 329 fs.mkdirs(inputPaths[i]); 330 } catch (IOException e) { 331 332 } 333 } 334 } 335 } 336 job.submit(); 337 this.state = State.RUNNING; 338 } catch (Exception ioe) { 339 LOG.info(getJobName()+" got an error while submitting ",ioe); 340 this.state = State.FAILED; 341 this.message = StringUtils.stringifyException(ioe); 342 } 343 } 344 345}