001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.jobcontrol; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Iterator; 025import java.util.LinkedList; 026import java.util.List; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.mapred.jobcontrol.Job; 033import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State; 034import org.apache.hadoop.util.StringUtils; 035 036/** 037 * This class encapsulates a set of MapReduce jobs and its dependency. 038 * 039 * It tracks the states of the jobs by placing them into different tables 040 * according to their states. 041 * 042 * This class provides APIs for the client app to add a job to the group 043 * and to get the jobs in the group in different states. When a job is 044 * added, an ID unique to the group is assigned to the job. 045 * 046 * This class has a thread that submits jobs when they become ready, 047 * monitors the states of the running jobs, and updates the states of jobs 048 * based on the state changes of their depending jobs states. The class 049 * provides APIs for suspending/resuming the thread, and 050 * for stopping the thread. 051 * 052 */ 053@InterfaceAudience.Public 054@InterfaceStability.Evolving 055public class JobControl implements Runnable { 056 private static final Log LOG = LogFactory.getLog(JobControl.class); 057 058 // The thread can be in one of the following state 059 public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY}; 060 061 private ThreadState runnerState; // the thread state 062 063 private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>(); 064 private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>(); 065 private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>(); 066 067 private long nextJobID; 068 private String groupName; 069 070 /** 071 * Construct a job control for a group of jobs. 072 * @param groupName a name identifying this group 073 */ 074 public JobControl(String groupName) { 075 this.nextJobID = -1; 076 this.groupName = groupName; 077 this.runnerState = ThreadState.READY; 078 } 079 080 private static List<ControlledJob> toList( 081 LinkedList<ControlledJob> jobs) { 082 ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>(); 083 for (ControlledJob job : jobs) { 084 retv.add(job); 085 } 086 return retv; 087 } 088 089 synchronized private List<ControlledJob> getJobsIn(State state) { 090 LinkedList<ControlledJob> l = new LinkedList<ControlledJob>(); 091 for(ControlledJob j: jobsInProgress) { 092 if(j.getJobState() == state) { 093 l.add(j); 094 } 095 } 096 return l; 097 } 098 099 /** 100 * @return the jobs in the waiting state 101 */ 102 public List<ControlledJob> getWaitingJobList() { 103 return getJobsIn(State.WAITING); 104 } 105 106 /** 107 * @return the jobs in the running state 108 */ 109 public List<ControlledJob> getRunningJobList() { 110 return getJobsIn(State.RUNNING); 111 } 112 113 /** 114 * @return the jobs in the ready state 115 */ 116 public List<ControlledJob> getReadyJobsList() { 117 return getJobsIn(State.READY); 118 } 119 120 /** 121 * @return the jobs in the success state 122 */ 123 synchronized public List<ControlledJob> getSuccessfulJobList() { 124 return toList(this.successfulJobs); 125 } 126 127 synchronized public List<ControlledJob> getFailedJobList() { 128 return toList(this.failedJobs); 129 } 130 131 private String getNextJobID() { 132 nextJobID += 1; 133 return this.groupName + this.nextJobID; 134 } 135 136 /** 137 * Add a new controlled job. 138 * @param aJob the new controlled job 139 */ 140 synchronized public String addJob(ControlledJob aJob) { 141 String id = this.getNextJobID(); 142 aJob.setJobID(id); 143 aJob.setJobState(State.WAITING); 144 jobsInProgress.add(aJob); 145 return id; 146 } 147 148 /** 149 * Add a new job. 150 * @param aJob the new job 151 */ 152 synchronized public String addJob(Job aJob) { 153 return addJob((ControlledJob) aJob); 154 } 155 156 /** 157 * Add a collection of jobs 158 * 159 * @param jobs 160 */ 161 public void addJobCollection(Collection<ControlledJob> jobs) { 162 for (ControlledJob job : jobs) { 163 addJob(job); 164 } 165 } 166 167 /** 168 * @return the thread state 169 */ 170 public ThreadState getThreadState() { 171 return this.runnerState; 172 } 173 174 /** 175 * set the thread state to STOPPING so that the 176 * thread will stop when it wakes up. 177 */ 178 public void stop() { 179 this.runnerState = ThreadState.STOPPING; 180 } 181 182 /** 183 * suspend the running thread 184 */ 185 public void suspend () { 186 if (this.runnerState == ThreadState.RUNNING) { 187 this.runnerState = ThreadState.SUSPENDED; 188 } 189 } 190 191 /** 192 * resume the suspended thread 193 */ 194 public void resume () { 195 if (this.runnerState == ThreadState.SUSPENDED) { 196 this.runnerState = ThreadState.RUNNING; 197 } 198 } 199 200 synchronized public boolean allFinished() { 201 return jobsInProgress.isEmpty(); 202 } 203 204 /** 205 * The main loop for the thread. 206 * The loop does the following: 207 * Check the states of the running jobs 208 * Update the states of waiting jobs 209 * Submit the jobs in ready state 210 */ 211 public void run() { 212 try { 213 this.runnerState = ThreadState.RUNNING; 214 while (true) { 215 while (this.runnerState == ThreadState.SUSPENDED) { 216 try { 217 Thread.sleep(5000); 218 } 219 catch (Exception e) { 220 //TODO the thread was interrupted, do something!!! 221 } 222 } 223 224 synchronized(this) { 225 Iterator<ControlledJob> it = jobsInProgress.iterator(); 226 while(it.hasNext()) { 227 ControlledJob j = it.next(); 228 LOG.debug("Checking state of job "+j); 229 switch(j.checkState()) { 230 case SUCCESS: 231 successfulJobs.add(j); 232 it.remove(); 233 break; 234 case FAILED: 235 case DEPENDENT_FAILED: 236 failedJobs.add(j); 237 it.remove(); 238 break; 239 case READY: 240 j.submit(); 241 break; 242 case RUNNING: 243 case WAITING: 244 //Do Nothing 245 break; 246 } 247 } 248 } 249 250 if (this.runnerState != ThreadState.RUNNING && 251 this.runnerState != ThreadState.SUSPENDED) { 252 break; 253 } 254 try { 255 Thread.sleep(5000); 256 } 257 catch (Exception e) { 258 //TODO the thread was interrupted, do something!!! 259 } 260 if (this.runnerState != ThreadState.RUNNING && 261 this.runnerState != ThreadState.SUSPENDED) { 262 break; 263 } 264 } 265 }catch(Throwable t) { 266 LOG.error("Error while trying to run jobs.",t); 267 //Mark all jobs as failed because we got something bad. 268 failAllJobs(t); 269 } 270 this.runnerState = ThreadState.STOPPED; 271 } 272 273 synchronized private void failAllJobs(Throwable t) { 274 String message = "Unexpected System Error Occured: "+ 275 StringUtils.stringifyException(t); 276 Iterator<ControlledJob> it = jobsInProgress.iterator(); 277 while(it.hasNext()) { 278 ControlledJob j = it.next(); 279 try { 280 j.failJob(message); 281 } catch (IOException e) { 282 LOG.error("Error while tyring to clean up "+j.getJobName(), e); 283 } catch (InterruptedException e) { 284 LOG.error("Error while tyring to clean up "+j.getJobName(), e); 285 } finally { 286 failedJobs.add(j); 287 it.remove(); 288 } 289 } 290 } 291}