001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.jobcontrol;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Iterator;
025import java.util.LinkedList;
026import java.util.List;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience;
031import org.apache.hadoop.classification.InterfaceStability;
032import org.apache.hadoop.mapred.jobcontrol.Job;
033import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
034import org.apache.hadoop.util.StringUtils;
035
036/** 
037 *  This class encapsulates a set of MapReduce jobs and its dependency.
038 *   
039 *  It tracks the states of the jobs by placing them into different tables
040 *  according to their states. 
041 *  
042 *  This class provides APIs for the client app to add a job to the group 
043 *  and to get the jobs in the group in different states. When a job is 
044 *  added, an ID unique to the group is assigned to the job. 
045 *  
046 *  This class has a thread that submits jobs when they become ready, 
047 *  monitors the states of the running jobs, and updates the states of jobs
048 *  based on the state changes of their depending jobs states. The class 
049 *  provides APIs for suspending/resuming the thread, and 
050 *  for stopping the thread.
051 *  
052 */
053@InterfaceAudience.Public
054@InterfaceStability.Evolving
055public class JobControl implements Runnable {
056  private static final Log LOG = LogFactory.getLog(JobControl.class);
057
058  // The thread can be in one of the following state
059  public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY};
060        
061  private ThreadState runnerState;                      // the thread state
062        
063  private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>();
064  private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>();
065  private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>();
066        
067  private long nextJobID;
068  private String groupName;
069        
070  /** 
071   * Construct a job control for a group of jobs.
072   * @param groupName a name identifying this group
073   */
074  public JobControl(String groupName) {
075    this.nextJobID = -1;
076    this.groupName = groupName;
077    this.runnerState = ThreadState.READY;
078  }
079        
080  private static List<ControlledJob> toList(
081                   LinkedList<ControlledJob> jobs) {
082    ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
083    for (ControlledJob job : jobs) {
084      retv.add(job);
085    }
086    return retv;
087  }
088        
089  synchronized private List<ControlledJob> getJobsIn(State state) {
090    LinkedList<ControlledJob> l = new LinkedList<ControlledJob>();
091    for(ControlledJob j: jobsInProgress) {
092      if(j.getJobState() == state) {
093        l.add(j);
094      }
095    }
096    return l;
097  }
098  
099  /**
100   * @return the jobs in the waiting state
101   */
102  public List<ControlledJob> getWaitingJobList() {
103    return getJobsIn(State.WAITING);
104  }
105        
106  /**
107   * @return the jobs in the running state
108   */
109  public List<ControlledJob> getRunningJobList() {
110    return getJobsIn(State.RUNNING);
111  }
112        
113  /**
114   * @return the jobs in the ready state
115   */
116  public List<ControlledJob> getReadyJobsList() {
117    return getJobsIn(State.READY);
118  }
119        
120  /**
121   * @return the jobs in the success state
122   */
123  synchronized public List<ControlledJob> getSuccessfulJobList() {
124    return toList(this.successfulJobs);
125  }
126        
127  synchronized public List<ControlledJob> getFailedJobList() {
128    return toList(this.failedJobs);
129  }
130        
131  private String getNextJobID() {
132    nextJobID += 1;
133    return this.groupName + this.nextJobID;
134  }
135
136  /**
137   * Add a new controlled job.
138   * @param aJob the new controlled job
139   */
140  synchronized public String addJob(ControlledJob aJob) {
141    String id = this.getNextJobID();
142    aJob.setJobID(id);
143    aJob.setJobState(State.WAITING);
144    jobsInProgress.add(aJob);
145    return id;  
146  }
147
148  /**
149   * Add a new job.
150   * @param aJob the new job
151   */
152  synchronized public String addJob(Job aJob) {
153    return addJob((ControlledJob) aJob);
154  }
155
156  /**
157   * Add a collection of jobs
158   * 
159   * @param jobs
160   */
161  public void addJobCollection(Collection<ControlledJob> jobs) {
162    for (ControlledJob job : jobs) {
163      addJob(job);
164    }
165  }
166        
167  /**
168   * @return the thread state
169   */
170  public ThreadState getThreadState() {
171    return this.runnerState;
172  }
173        
174  /**
175   * set the thread state to STOPPING so that the 
176   * thread will stop when it wakes up.
177   */
178  public void stop() {
179    this.runnerState = ThreadState.STOPPING;
180  }
181        
182  /**
183   * suspend the running thread
184   */
185  public void suspend () {
186    if (this.runnerState == ThreadState.RUNNING) {
187      this.runnerState = ThreadState.SUSPENDED;
188    }
189  }
190        
191  /**
192   * resume the suspended thread
193   */
194  public void resume () {
195    if (this.runnerState == ThreadState.SUSPENDED) {
196      this.runnerState = ThreadState.RUNNING;
197    }
198  }
199        
200  synchronized public boolean allFinished() {
201    return jobsInProgress.isEmpty();
202  }
203        
204  /**
205   *  The main loop for the thread.
206   *  The loop does the following:
207   *    Check the states of the running jobs
208   *    Update the states of waiting jobs
209   *    Submit the jobs in ready state
210   */
211  public void run() {
212    try {
213      this.runnerState = ThreadState.RUNNING;
214      while (true) {
215        while (this.runnerState == ThreadState.SUSPENDED) {
216          try {
217            Thread.sleep(5000);
218          }
219          catch (Exception e) {
220            //TODO the thread was interrupted, do something!!!
221          }
222        }
223        
224        synchronized(this) {
225          Iterator<ControlledJob> it = jobsInProgress.iterator();
226          while(it.hasNext()) {
227            ControlledJob j = it.next();
228            LOG.debug("Checking state of job "+j);
229            switch(j.checkState()) {
230            case SUCCESS:
231              successfulJobs.add(j);
232              it.remove();
233              break;
234            case FAILED:
235            case DEPENDENT_FAILED:
236              failedJobs.add(j);
237              it.remove();
238              break;
239            case READY:
240              j.submit();
241              break;
242            case RUNNING:
243            case WAITING:
244              //Do Nothing
245              break;
246            }
247          }
248        }
249        
250        if (this.runnerState != ThreadState.RUNNING && 
251            this.runnerState != ThreadState.SUSPENDED) {
252          break;
253        }
254        try {
255          Thread.sleep(5000);
256        }
257        catch (Exception e) {
258          //TODO the thread was interrupted, do something!!!
259        }
260        if (this.runnerState != ThreadState.RUNNING && 
261            this.runnerState != ThreadState.SUSPENDED) {
262          break;
263        }
264      }
265    }catch(Throwable t) {
266      LOG.error("Error while trying to run jobs.",t);
267      //Mark all jobs as failed because we got something bad.
268      failAllJobs(t);
269    }
270    this.runnerState = ThreadState.STOPPED;
271  }
272
273  synchronized private void failAllJobs(Throwable t) {
274    String message = "Unexpected System Error Occured: "+
275    StringUtils.stringifyException(t);
276    Iterator<ControlledJob> it = jobsInProgress.iterator();
277    while(it.hasNext()) {
278      ControlledJob j = it.next();
279      try {
280        j.failJob(message);
281      } catch (IOException e) {
282        LOG.error("Error while tyring to clean up "+j.getJobName(), e);
283      } catch (InterruptedException e) {
284        LOG.error("Error while tyring to clean up "+j.getJobName(), e);
285      } finally {
286        failedJobs.add(j);
287        it.remove();
288      }
289    }
290  }
291}