001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.jobcontrol; 020 021 022 import java.io.IOException; 023 import java.util.ArrayList; 024 import java.util.List; 025 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.apache.hadoop.classification.InterfaceAudience; 029 import org.apache.hadoop.classification.InterfaceStability; 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.fs.FileSystem; 032 import org.apache.hadoop.fs.Path; 033 import org.apache.hadoop.mapreduce.Job; 034 import org.apache.hadoop.mapreduce.JobID; 035 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 036 import org.apache.hadoop.util.StringUtils; 037 038 /** 039 * This class encapsulates a MapReduce job and its dependency. It monitors 040 * the states of the depending jobs and updates the state of this job. 041 * A job starts in the WAITING state. If it does not have any depending jobs, 042 * or all of the depending jobs are in SUCCESS state, then the job state 043 * will become READY. If any depending jobs fail, the job will fail too. 044 * When in READY state, the job can be submitted to Hadoop for execution, with 045 * the state changing into RUNNING state. From RUNNING state, the job 046 * can get into SUCCESS or FAILED state, depending 047 * the status of the job execution. 048 */ 049 @InterfaceAudience.Public 050 @InterfaceStability.Evolving 051 public class ControlledJob { 052 private static final Log LOG = LogFactory.getLog(ControlledJob.class); 053 054 // A job will be in one of the following states 055 public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED, 056 DEPENDENT_FAILED}; 057 public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist"; 058 private State state; 059 private String controlID; // assigned and used by JobControl class 060 private Job job; // mapreduce job to be executed. 061 // some info for human consumption, e.g. the reason why the job failed 062 private String message; 063 // the jobs the current job depends on 064 private List<ControlledJob> dependingJobs; 065 066 /** 067 * Construct a job. 068 * @param job a mapreduce job to be executed. 069 * @param dependingJobs an array of jobs the current job depends on 070 */ 071 public ControlledJob(Job job, List<ControlledJob> dependingJobs) 072 throws IOException { 073 this.job = job; 074 this.dependingJobs = dependingJobs; 075 this.state = State.WAITING; 076 this.controlID = "unassigned"; 077 this.message = "just initialized"; 078 } 079 080 /** 081 * Construct a job. 082 * 083 * @param conf mapred job configuration representing a job to be executed. 084 * @throws IOException 085 */ 086 public ControlledJob(Configuration conf) throws IOException { 087 this(new Job(conf), null); 088 } 089 090 @Override 091 public String toString() { 092 StringBuffer sb = new StringBuffer(); 093 sb.append("job name:\t").append(this.job.getJobName()).append("\n"); 094 sb.append("job id:\t").append(this.controlID).append("\n"); 095 sb.append("job state:\t").append(this.state).append("\n"); 096 sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n"); 097 sb.append("job message:\t").append(this.message).append("\n"); 098 099 if (this.dependingJobs == null || this.dependingJobs.size() == 0) { 100 sb.append("job has no depending job:\t").append("\n"); 101 } else { 102 sb.append("job has ").append(this.dependingJobs.size()). 103 append(" dependeng jobs:\n"); 104 for (int i = 0; i < this.dependingJobs.size(); i++) { 105 sb.append("\t depending job ").append(i).append(":\t"); 106 sb.append((this.dependingJobs.get(i)).getJobName()).append("\n"); 107 } 108 } 109 return sb.toString(); 110 } 111 112 /** 113 * @return the job name of this job 114 */ 115 public String getJobName() { 116 return job.getJobName(); 117 } 118 119 /** 120 * Set the job name for this job. 121 * @param jobName the job name 122 */ 123 public void setJobName(String jobName) { 124 job.setJobName(jobName); 125 } 126 127 /** 128 * @return the job ID of this job assigned by JobControl 129 */ 130 public String getJobID() { 131 return this.controlID; 132 } 133 134 /** 135 * Set the job ID for this job. 136 * @param id the job ID 137 */ 138 public void setJobID(String id) { 139 this.controlID = id; 140 } 141 142 /** 143 * @return the mapred ID of this job as assigned by the mapred framework. 144 */ 145 public synchronized JobID getMapredJobId() { 146 return this.job.getJobID(); 147 } 148 149 /** 150 * @return the mapreduce job 151 */ 152 public synchronized Job getJob() { 153 return this.job; 154 } 155 156 /** 157 * Set the mapreduce job 158 * @param job the mapreduce job for this job. 159 */ 160 public synchronized void setJob(Job job) { 161 this.job = job; 162 } 163 164 /** 165 * @return the state of this job 166 */ 167 public synchronized State getJobState() { 168 return this.state; 169 } 170 171 /** 172 * Set the state for this job. 173 * @param state the new state for this job. 174 */ 175 protected synchronized void setJobState(State state) { 176 this.state = state; 177 } 178 179 /** 180 * @return the message of this job 181 */ 182 public synchronized String getMessage() { 183 return this.message; 184 } 185 186 /** 187 * Set the message for this job. 188 * @param message the message for this job. 189 */ 190 public synchronized void setMessage(String message) { 191 this.message = message; 192 } 193 194 /** 195 * @return the depending jobs of this job 196 */ 197 public List<ControlledJob> getDependentJobs() { 198 return this.dependingJobs; 199 } 200 201 /** 202 * Add a job to this jobs' dependency list. 203 * Dependent jobs can only be added while a Job 204 * is waiting to run, not during or afterwards. 205 * 206 * @param dependingJob Job that this Job depends on. 207 * @return <tt>true</tt> if the Job was added. 208 */ 209 public synchronized boolean addDependingJob(ControlledJob dependingJob) { 210 if (this.state == State.WAITING) { //only allowed to add jobs when waiting 211 if (this.dependingJobs == null) { 212 this.dependingJobs = new ArrayList<ControlledJob>(); 213 } 214 return this.dependingJobs.add(dependingJob); 215 } else { 216 return false; 217 } 218 } 219 220 /** 221 * @return true if this job is in a complete state 222 */ 223 public synchronized boolean isCompleted() { 224 return this.state == State.FAILED || 225 this.state == State.DEPENDENT_FAILED || 226 this.state == State.SUCCESS; 227 } 228 229 /** 230 * @return true if this job is in READY state 231 */ 232 public synchronized boolean isReady() { 233 return this.state == State.READY; 234 } 235 236 public void killJob() throws IOException, InterruptedException { 237 job.killJob(); 238 } 239 240 public synchronized void failJob(String message) throws IOException, InterruptedException { 241 try { 242 if(job != null && this.state == State.RUNNING) { 243 job.killJob(); 244 } 245 } finally { 246 this.state = State.FAILED; 247 this.message = message; 248 } 249 } 250 251 /** 252 * Check the state of this running job. The state may 253 * remain the same, become SUCCESS or FAILED. 254 */ 255 private void checkRunningState() throws IOException, InterruptedException { 256 try { 257 if (job.isComplete()) { 258 if (job.isSuccessful()) { 259 this.state = State.SUCCESS; 260 } else { 261 this.state = State.FAILED; 262 this.message = "Job failed!"; 263 } 264 } 265 } catch (IOException ioe) { 266 this.state = State.FAILED; 267 this.message = StringUtils.stringifyException(ioe); 268 try { 269 if (job != null) { 270 job.killJob(); 271 } 272 } catch (IOException e) {} 273 } 274 } 275 276 /** 277 * Check and update the state of this job. The state changes 278 * depending on its current state and the states of the depending jobs. 279 */ 280 synchronized State checkState() throws IOException, InterruptedException { 281 if (this.state == State.RUNNING) { 282 checkRunningState(); 283 } 284 if (this.state != State.WAITING) { 285 return this.state; 286 } 287 if (this.dependingJobs == null || this.dependingJobs.size() == 0) { 288 this.state = State.READY; 289 return this.state; 290 } 291 ControlledJob pred = null; 292 int n = this.dependingJobs.size(); 293 for (int i = 0; i < n; i++) { 294 pred = this.dependingJobs.get(i); 295 State s = pred.checkState(); 296 if (s == State.WAITING || s == State.READY || s == State.RUNNING) { 297 break; // a pred is still not completed, continue in WAITING 298 // state 299 } 300 if (s == State.FAILED || s == State.DEPENDENT_FAILED) { 301 this.state = State.DEPENDENT_FAILED; 302 this.message = "depending job " + i + " with jobID " 303 + pred.getJobID() + " failed. " + pred.getMessage(); 304 break; 305 } 306 // pred must be in success state 307 if (i == n - 1) { 308 this.state = State.READY; 309 } 310 } 311 312 return this.state; 313 } 314 315 /** 316 * Submit this job to mapred. The state becomes RUNNING if submission 317 * is successful, FAILED otherwise. 318 */ 319 protected synchronized void submit() { 320 try { 321 Configuration conf = job.getConfiguration(); 322 if (conf.getBoolean(CREATE_DIR, false)) { 323 FileSystem fs = FileSystem.get(conf); 324 Path inputPaths[] = FileInputFormat.getInputPaths(job); 325 for (int i = 0; i < inputPaths.length; i++) { 326 if (!fs.exists(inputPaths[i])) { 327 try { 328 fs.mkdirs(inputPaths[i]); 329 } catch (IOException e) { 330 331 } 332 } 333 } 334 } 335 job.submit(); 336 this.state = State.RUNNING; 337 } catch (Exception ioe) { 338 LOG.info(getJobName()+" got an error while submitting ",ioe); 339 this.state = State.FAILED; 340 this.message = StringUtils.stringifyException(ioe); 341 } 342 } 343 344 }