001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.jobcontrol;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Iterator;
025import java.util.LinkedList;
026import java.util.List;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience;
031import org.apache.hadoop.classification.InterfaceStability;
032import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
033import org.apache.hadoop.util.StringUtils;
034
035/** 
036 *  This class encapsulates a set of MapReduce jobs and its dependency.
037 *   
038 *  It tracks the states of the jobs by placing them into different tables
039 *  according to their states. 
040 *  
041 *  This class provides APIs for the client app to add a job to the group 
042 *  and to get the jobs in the group in different states. When a job is 
043 *  added, an ID unique to the group is assigned to the job. 
044 *  
045 *  This class has a thread that submits jobs when they become ready, 
046 *  monitors the states of the running jobs, and updates the states of jobs
047 *  based on the state changes of their depending jobs states. The class 
048 *  provides APIs for suspending/resuming the thread, and 
049 *  for stopping the thread.
050 *  
051 */
052@InterfaceAudience.Public
053@InterfaceStability.Evolving
054public class JobControl implements Runnable {
055  private static final Log LOG = LogFactory.getLog(JobControl.class);
056
057  // The thread can be in one of the following state
058  public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY};
059        
060  private ThreadState runnerState;                      // the thread state
061        
062  private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>();
063  private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>();
064  private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>();
065        
066  private long nextJobID;
067  private String groupName;
068        
069  /** 
070   * Construct a job control for a group of jobs.
071   * @param groupName a name identifying this group
072   */
073  public JobControl(String groupName) {
074    this.nextJobID = -1;
075    this.groupName = groupName;
076    this.runnerState = ThreadState.READY;
077  }
078        
079  private static List<ControlledJob> toList(
080                   LinkedList<ControlledJob> jobs) {
081    ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
082    for (ControlledJob job : jobs) {
083      retv.add(job);
084    }
085    return retv;
086  }
087        
088  synchronized private List<ControlledJob> getJobsIn(State state) {
089    LinkedList<ControlledJob> l = new LinkedList<ControlledJob>();
090    for(ControlledJob j: jobsInProgress) {
091      if(j.getJobState() == state) {
092        l.add(j);
093      }
094    }
095    return l;
096  }
097  
098  /**
099   * @return the jobs in the waiting state
100   */
101  public List<ControlledJob> getWaitingJobList() {
102    return getJobsIn(State.WAITING);
103  }
104        
105  /**
106   * @return the jobs in the running state
107   */
108  public List<ControlledJob> getRunningJobList() {
109    return getJobsIn(State.RUNNING);
110  }
111        
112  /**
113   * @return the jobs in the ready state
114   */
115  public List<ControlledJob> getReadyJobsList() {
116    return getJobsIn(State.READY);
117  }
118        
119  /**
120   * @return the jobs in the success state
121   */
122  synchronized public List<ControlledJob> getSuccessfulJobList() {
123    return toList(this.successfulJobs);
124  }
125        
126  synchronized public List<ControlledJob> getFailedJobList() {
127    return toList(this.failedJobs);
128  }
129        
130  private String getNextJobID() {
131    nextJobID += 1;
132    return this.groupName + this.nextJobID;
133  }
134
135  /**
136   * Add a new job.
137   * @param aJob the new job
138   */
139  synchronized public String addJob(ControlledJob aJob) {
140    String id = this.getNextJobID();
141    aJob.setJobID(id);
142    aJob.setJobState(State.WAITING);
143    jobsInProgress.add(aJob);
144    return id;  
145  }
146        
147  /**
148   * Add a collection of jobs
149   * 
150   * @param jobs
151   */
152  public void addJobCollection(Collection<ControlledJob> jobs) {
153    for (ControlledJob job : jobs) {
154      addJob(job);
155    }
156  }
157        
158  /**
159   * @return the thread state
160   */
161  public ThreadState getThreadState() {
162    return this.runnerState;
163  }
164        
165  /**
166   * set the thread state to STOPPING so that the 
167   * thread will stop when it wakes up.
168   */
169  public void stop() {
170    this.runnerState = ThreadState.STOPPING;
171  }
172        
173  /**
174   * suspend the running thread
175   */
176  public void suspend () {
177    if (this.runnerState == ThreadState.RUNNING) {
178      this.runnerState = ThreadState.SUSPENDED;
179    }
180  }
181        
182  /**
183   * resume the suspended thread
184   */
185  public void resume () {
186    if (this.runnerState == ThreadState.SUSPENDED) {
187      this.runnerState = ThreadState.RUNNING;
188    }
189  }
190        
191  synchronized public boolean allFinished() {
192    return jobsInProgress.isEmpty();
193  }
194        
195  /**
196   *  The main loop for the thread.
197   *  The loop does the following:
198   *    Check the states of the running jobs
199   *    Update the states of waiting jobs
200   *    Submit the jobs in ready state
201   */
202  public void run() {
203    try {
204      this.runnerState = ThreadState.RUNNING;
205      while (true) {
206        while (this.runnerState == ThreadState.SUSPENDED) {
207          try {
208            Thread.sleep(5000);
209          }
210          catch (Exception e) {
211            //TODO the thread was interrupted, do something!!!
212          }
213        }
214        
215        synchronized(this) {
216          Iterator<ControlledJob> it = jobsInProgress.iterator();
217          while(it.hasNext()) {
218            ControlledJob j = it.next();
219            LOG.debug("Checking state of job "+j);
220            switch(j.checkState()) {
221            case SUCCESS:
222              successfulJobs.add(j);
223              it.remove();
224              break;
225            case FAILED:
226            case DEPENDENT_FAILED:
227              failedJobs.add(j);
228              it.remove();
229              break;
230            case READY:
231              j.submit();
232              break;
233            case RUNNING:
234            case WAITING:
235              //Do Nothing
236              break;
237            }
238          }
239        }
240        
241        if (this.runnerState != ThreadState.RUNNING && 
242            this.runnerState != ThreadState.SUSPENDED) {
243          break;
244        }
245        try {
246          Thread.sleep(5000);
247        }
248        catch (Exception e) {
249          //TODO the thread was interrupted, do something!!!
250        }
251        if (this.runnerState != ThreadState.RUNNING && 
252            this.runnerState != ThreadState.SUSPENDED) {
253          break;
254        }
255      }
256    }catch(Throwable t) {
257      LOG.error("Error while trying to run jobs.",t);
258      //Mark all jobs as failed because we got something bad.
259      failAllJobs(t);
260    }
261    this.runnerState = ThreadState.STOPPED;
262  }
263
264  synchronized private void failAllJobs(Throwable t) {
265    String message = "Unexpected System Error Occured: "+
266    StringUtils.stringifyException(t);
267    Iterator<ControlledJob> it = jobsInProgress.iterator();
268    while(it.hasNext()) {
269      ControlledJob j = it.next();
270      try {
271        j.failJob(message);
272      } catch (IOException e) {
273        LOG.error("Error while tyring to clean up "+j.getJobName(), e);
274      } catch (InterruptedException e) {
275        LOG.error("Error while tyring to clean up "+j.getJobName(), e);
276      } finally {
277        failedJobs.add(j);
278        it.remove();
279      }
280    }
281  }
282}