001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.jobcontrol;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.Collection;
024 import java.util.Iterator;
025 import java.util.LinkedList;
026 import java.util.List;
027
028 import org.apache.commons.logging.Log;
029 import org.apache.commons.logging.LogFactory;
030 import org.apache.hadoop.classification.InterfaceAudience;
031 import org.apache.hadoop.classification.InterfaceStability;
032 import org.apache.hadoop.mapred.jobcontrol.Job;
033 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
034 import org.apache.hadoop.util.StringUtils;
035
036 /**
037 * This class encapsulates a set of MapReduce jobs and its dependency.
038 *
039 * It tracks the states of the jobs by placing them into different tables
040 * according to their states.
041 *
042 * This class provides APIs for the client app to add a job to the group
043 * and to get the jobs in the group in different states. When a job is
044 * added, an ID unique to the group is assigned to the job.
045 *
046 * This class has a thread that submits jobs when they become ready,
047 * monitors the states of the running jobs, and updates the states of jobs
048 * based on the state changes of their depending jobs states. The class
049 * provides APIs for suspending/resuming the thread, and
050 * for stopping the thread.
051 *
052 */
053 @InterfaceAudience.Public
054 @InterfaceStability.Evolving
055 public class JobControl implements Runnable {
056 private static final Log LOG = LogFactory.getLog(JobControl.class);
057
058 // The thread can be in one of the following state
059 public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY};
060
061 private ThreadState runnerState; // the thread state
062
063 private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>();
064 private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>();
065 private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>();
066
067 private long nextJobID;
068 private String groupName;
069
070 /**
071 * Construct a job control for a group of jobs.
072 * @param groupName a name identifying this group
073 */
074 public JobControl(String groupName) {
075 this.nextJobID = -1;
076 this.groupName = groupName;
077 this.runnerState = ThreadState.READY;
078 }
079
080 private static List<ControlledJob> toList(
081 LinkedList<ControlledJob> jobs) {
082 ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
083 for (ControlledJob job : jobs) {
084 retv.add(job);
085 }
086 return retv;
087 }
088
089 synchronized private List<ControlledJob> getJobsIn(State state) {
090 LinkedList<ControlledJob> l = new LinkedList<ControlledJob>();
091 for(ControlledJob j: jobsInProgress) {
092 if(j.getJobState() == state) {
093 l.add(j);
094 }
095 }
096 return l;
097 }
098
099 /**
100 * @return the jobs in the waiting state
101 */
102 public List<ControlledJob> getWaitingJobList() {
103 return getJobsIn(State.WAITING);
104 }
105
106 /**
107 * @return the jobs in the running state
108 */
109 public List<ControlledJob> getRunningJobList() {
110 return getJobsIn(State.RUNNING);
111 }
112
113 /**
114 * @return the jobs in the ready state
115 */
116 public List<ControlledJob> getReadyJobsList() {
117 return getJobsIn(State.READY);
118 }
119
120 /**
121 * @return the jobs in the success state
122 */
123 synchronized public List<ControlledJob> getSuccessfulJobList() {
124 return toList(this.successfulJobs);
125 }
126
127 synchronized public List<ControlledJob> getFailedJobList() {
128 return toList(this.failedJobs);
129 }
130
131 private String getNextJobID() {
132 nextJobID += 1;
133 return this.groupName + this.nextJobID;
134 }
135
136 /**
137 * Add a new controlled job.
138 * @param aJob the new controlled job
139 */
140 synchronized public String addJob(ControlledJob aJob) {
141 String id = this.getNextJobID();
142 aJob.setJobID(id);
143 aJob.setJobState(State.WAITING);
144 jobsInProgress.add(aJob);
145 return id;
146 }
147
148 /**
149 * Add a new job.
150 * @param aJob the new job
151 */
152 synchronized public String addJob(Job aJob) {
153 return addJob((ControlledJob) aJob);
154 }
155
156 /**
157 * Add a collection of jobs
158 *
159 * @param jobs
160 */
161 public void addJobCollection(Collection<ControlledJob> jobs) {
162 for (ControlledJob job : jobs) {
163 addJob(job);
164 }
165 }
166
167 /**
168 * @return the thread state
169 */
170 public ThreadState getThreadState() {
171 return this.runnerState;
172 }
173
174 /**
175 * set the thread state to STOPPING so that the
176 * thread will stop when it wakes up.
177 */
178 public void stop() {
179 this.runnerState = ThreadState.STOPPING;
180 }
181
182 /**
183 * suspend the running thread
184 */
185 public void suspend () {
186 if (this.runnerState == ThreadState.RUNNING) {
187 this.runnerState = ThreadState.SUSPENDED;
188 }
189 }
190
191 /**
192 * resume the suspended thread
193 */
194 public void resume () {
195 if (this.runnerState == ThreadState.SUSPENDED) {
196 this.runnerState = ThreadState.RUNNING;
197 }
198 }
199
200 synchronized public boolean allFinished() {
201 return jobsInProgress.isEmpty();
202 }
203
204 /**
205 * The main loop for the thread.
206 * The loop does the following:
207 * Check the states of the running jobs
208 * Update the states of waiting jobs
209 * Submit the jobs in ready state
210 */
211 public void run() {
212 try {
213 this.runnerState = ThreadState.RUNNING;
214 while (true) {
215 while (this.runnerState == ThreadState.SUSPENDED) {
216 try {
217 Thread.sleep(5000);
218 }
219 catch (Exception e) {
220 //TODO the thread was interrupted, do something!!!
221 }
222 }
223
224 synchronized(this) {
225 Iterator<ControlledJob> it = jobsInProgress.iterator();
226 while(it.hasNext()) {
227 ControlledJob j = it.next();
228 LOG.debug("Checking state of job "+j);
229 switch(j.checkState()) {
230 case SUCCESS:
231 successfulJobs.add(j);
232 it.remove();
233 break;
234 case FAILED:
235 case DEPENDENT_FAILED:
236 failedJobs.add(j);
237 it.remove();
238 break;
239 case READY:
240 j.submit();
241 break;
242 case RUNNING:
243 case WAITING:
244 //Do Nothing
245 break;
246 }
247 }
248 }
249
250 if (this.runnerState != ThreadState.RUNNING &&
251 this.runnerState != ThreadState.SUSPENDED) {
252 break;
253 }
254 try {
255 Thread.sleep(5000);
256 }
257 catch (Exception e) {
258 //TODO the thread was interrupted, do something!!!
259 }
260 if (this.runnerState != ThreadState.RUNNING &&
261 this.runnerState != ThreadState.SUSPENDED) {
262 break;
263 }
264 }
265 }catch(Throwable t) {
266 LOG.error("Error while trying to run jobs.",t);
267 //Mark all jobs as failed because we got something bad.
268 failAllJobs(t);
269 }
270 this.runnerState = ThreadState.STOPPED;
271 }
272
273 synchronized private void failAllJobs(Throwable t) {
274 String message = "Unexpected System Error Occured: "+
275 StringUtils.stringifyException(t);
276 Iterator<ControlledJob> it = jobsInProgress.iterator();
277 while(it.hasNext()) {
278 ControlledJob j = it.next();
279 try {
280 j.failJob(message);
281 } catch (IOException e) {
282 LOG.error("Error while tyring to clean up "+j.getJobName(), e);
283 } catch (InterruptedException e) {
284 LOG.error("Error while tyring to clean up "+j.getJobName(), e);
285 } finally {
286 failedJobs.add(j);
287 it.remove();
288 }
289 }
290 }
291 }