001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.jobcontrol; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Iterator; 025import java.util.LinkedList; 026import java.util.List; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State; 033import org.apache.hadoop.util.StringUtils; 034 035/** 036 * This class encapsulates a set of MapReduce jobs and its dependency. 037 * 038 * It tracks the states of the jobs by placing them into different tables 039 * according to their states. 040 * 041 * This class provides APIs for the client app to add a job to the group 042 * and to get the jobs in the group in different states. When a job is 043 * added, an ID unique to the group is assigned to the job. 044 * 045 * This class has a thread that submits jobs when they become ready, 046 * monitors the states of the running jobs, and updates the states of jobs 047 * based on the state changes of their depending jobs states. The class 048 * provides APIs for suspending/resuming the thread, and 049 * for stopping the thread. 050 * 051 */ 052@InterfaceAudience.Public 053@InterfaceStability.Evolving 054public class JobControl implements Runnable { 055 private static final Log LOG = LogFactory.getLog(JobControl.class); 056 057 // The thread can be in one of the following state 058 public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY}; 059 060 private ThreadState runnerState; // the thread state 061 062 private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>(); 063 private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>(); 064 private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>(); 065 066 private long nextJobID; 067 private String groupName; 068 069 /** 070 * Construct a job control for a group of jobs. 071 * @param groupName a name identifying this group 072 */ 073 public JobControl(String groupName) { 074 this.nextJobID = -1; 075 this.groupName = groupName; 076 this.runnerState = ThreadState.READY; 077 } 078 079 private static List<ControlledJob> toList( 080 LinkedList<ControlledJob> jobs) { 081 ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>(); 082 for (ControlledJob job : jobs) { 083 retv.add(job); 084 } 085 return retv; 086 } 087 088 synchronized private List<ControlledJob> getJobsIn(State state) { 089 LinkedList<ControlledJob> l = new LinkedList<ControlledJob>(); 090 for(ControlledJob j: jobsInProgress) { 091 if(j.getJobState() == state) { 092 l.add(j); 093 } 094 } 095 return l; 096 } 097 098 /** 099 * @return the jobs in the waiting state 100 */ 101 public List<ControlledJob> getWaitingJobList() { 102 return getJobsIn(State.WAITING); 103 } 104 105 /** 106 * @return the jobs in the running state 107 */ 108 public List<ControlledJob> getRunningJobList() { 109 return getJobsIn(State.RUNNING); 110 } 111 112 /** 113 * @return the jobs in the ready state 114 */ 115 public List<ControlledJob> getReadyJobsList() { 116 return getJobsIn(State.READY); 117 } 118 119 /** 120 * @return the jobs in the success state 121 */ 122 synchronized public List<ControlledJob> getSuccessfulJobList() { 123 return toList(this.successfulJobs); 124 } 125 126 synchronized public List<ControlledJob> getFailedJobList() { 127 return toList(this.failedJobs); 128 } 129 130 private String getNextJobID() { 131 nextJobID += 1; 132 return this.groupName + this.nextJobID; 133 } 134 135 /** 136 * Add a new job. 137 * @param aJob the new job 138 */ 139 synchronized public String addJob(ControlledJob aJob) { 140 String id = this.getNextJobID(); 141 aJob.setJobID(id); 142 aJob.setJobState(State.WAITING); 143 jobsInProgress.add(aJob); 144 return id; 145 } 146 147 /** 148 * Add a collection of jobs 149 * 150 * @param jobs 151 */ 152 public void addJobCollection(Collection<ControlledJob> jobs) { 153 for (ControlledJob job : jobs) { 154 addJob(job); 155 } 156 } 157 158 /** 159 * @return the thread state 160 */ 161 public ThreadState getThreadState() { 162 return this.runnerState; 163 } 164 165 /** 166 * set the thread state to STOPPING so that the 167 * thread will stop when it wakes up. 168 */ 169 public void stop() { 170 this.runnerState = ThreadState.STOPPING; 171 } 172 173 /** 174 * suspend the running thread 175 */ 176 public void suspend () { 177 if (this.runnerState == ThreadState.RUNNING) { 178 this.runnerState = ThreadState.SUSPENDED; 179 } 180 } 181 182 /** 183 * resume the suspended thread 184 */ 185 public void resume () { 186 if (this.runnerState == ThreadState.SUSPENDED) { 187 this.runnerState = ThreadState.RUNNING; 188 } 189 } 190 191 synchronized public boolean allFinished() { 192 return jobsInProgress.isEmpty(); 193 } 194 195 /** 196 * The main loop for the thread. 197 * The loop does the following: 198 * Check the states of the running jobs 199 * Update the states of waiting jobs 200 * Submit the jobs in ready state 201 */ 202 public void run() { 203 try { 204 this.runnerState = ThreadState.RUNNING; 205 while (true) { 206 while (this.runnerState == ThreadState.SUSPENDED) { 207 try { 208 Thread.sleep(5000); 209 } 210 catch (Exception e) { 211 //TODO the thread was interrupted, do something!!! 212 } 213 } 214 215 synchronized(this) { 216 Iterator<ControlledJob> it = jobsInProgress.iterator(); 217 while(it.hasNext()) { 218 ControlledJob j = it.next(); 219 LOG.debug("Checking state of job "+j); 220 switch(j.checkState()) { 221 case SUCCESS: 222 successfulJobs.add(j); 223 it.remove(); 224 break; 225 case FAILED: 226 case DEPENDENT_FAILED: 227 failedJobs.add(j); 228 it.remove(); 229 break; 230 case READY: 231 j.submit(); 232 break; 233 case RUNNING: 234 case WAITING: 235 //Do Nothing 236 break; 237 } 238 } 239 } 240 241 if (this.runnerState != ThreadState.RUNNING && 242 this.runnerState != ThreadState.SUSPENDED) { 243 break; 244 } 245 try { 246 Thread.sleep(5000); 247 } 248 catch (Exception e) { 249 //TODO the thread was interrupted, do something!!! 250 } 251 if (this.runnerState != ThreadState.RUNNING && 252 this.runnerState != ThreadState.SUSPENDED) { 253 break; 254 } 255 } 256 }catch(Throwable t) { 257 LOG.error("Error while trying to run jobs.",t); 258 //Mark all jobs as failed because we got something bad. 259 failAllJobs(t); 260 } 261 this.runnerState = ThreadState.STOPPED; 262 } 263 264 synchronized private void failAllJobs(Throwable t) { 265 String message = "Unexpected System Error Occured: "+ 266 StringUtils.stringifyException(t); 267 Iterator<ControlledJob> it = jobsInProgress.iterator(); 268 while(it.hasNext()) { 269 ControlledJob j = it.next(); 270 try { 271 j.failJob(message); 272 } catch (IOException e) { 273 LOG.error("Error while tyring to clean up "+j.getJobName(), e); 274 } catch (InterruptedException e) { 275 LOG.error("Error while tyring to clean up "+j.getJobName(), e); 276 } finally { 277 failedJobs.add(j); 278 it.remove(); 279 } 280 } 281 } 282}