001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.jobcontrol;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Collection;
024    import java.util.Iterator;
025    import java.util.LinkedList;
026    import java.util.List;
027    
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    import org.apache.hadoop.classification.InterfaceAudience;
031    import org.apache.hadoop.classification.InterfaceStability;
032    import org.apache.hadoop.mapred.jobcontrol.Job;
033    import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
034    import org.apache.hadoop.util.StringUtils;
035    
036    /** 
037     *  This class encapsulates a set of MapReduce jobs and its dependency.
038     *   
039     *  It tracks the states of the jobs by placing them into different tables
040     *  according to their states. 
041     *  
042     *  This class provides APIs for the client app to add a job to the group 
043     *  and to get the jobs in the group in different states. When a job is 
044     *  added, an ID unique to the group is assigned to the job. 
045     *  
046     *  This class has a thread that submits jobs when they become ready, 
047     *  monitors the states of the running jobs, and updates the states of jobs
048     *  based on the state changes of their depending jobs states. The class 
049     *  provides APIs for suspending/resuming the thread, and 
050     *  for stopping the thread.
051     *  
052     */
053    @InterfaceAudience.Public
054    @InterfaceStability.Evolving
055    public class JobControl implements Runnable {
056      private static final Log LOG = LogFactory.getLog(JobControl.class);
057    
058      // The thread can be in one of the following state
059      public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY};
060            
061      private ThreadState runnerState;                      // the thread state
062            
063      private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>();
064      private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>();
065      private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>();
066            
067      private long nextJobID;
068      private String groupName;
069            
070      /** 
071       * Construct a job control for a group of jobs.
072       * @param groupName a name identifying this group
073       */
074      public JobControl(String groupName) {
075        this.nextJobID = -1;
076        this.groupName = groupName;
077        this.runnerState = ThreadState.READY;
078      }
079            
080      synchronized private static List<ControlledJob> toList(
081                       LinkedList<ControlledJob> jobs) {
082        ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
083        for (ControlledJob job : jobs) {
084          retv.add(job);
085        }
086        return retv;
087      }
088            
089      synchronized private List<ControlledJob> getJobsIn(State state) {
090        LinkedList<ControlledJob> l = new LinkedList<ControlledJob>();
091        for(ControlledJob j: jobsInProgress) {
092          if(j.getJobState() == state) {
093            l.add(j);
094          }
095        }
096        return l;
097      }
098      
099      /**
100       * @return the jobs in the waiting state
101       */
102      public List<ControlledJob> getWaitingJobList() {
103        return getJobsIn(State.WAITING);
104      }
105            
106      /**
107       * @return the jobs in the running state
108       */
109      public List<ControlledJob> getRunningJobList() {
110        return getJobsIn(State.RUNNING);
111      }
112            
113      /**
114       * @return the jobs in the ready state
115       */
116      public List<ControlledJob> getReadyJobsList() {
117        return getJobsIn(State.READY);
118      }
119            
120      /**
121       * @return the jobs in the success state
122       */
123      public List<ControlledJob> getSuccessfulJobList() {
124        return toList(this.successfulJobs);
125      }
126            
127      public List<ControlledJob> getFailedJobList() {
128        return toList(this.failedJobs);
129      }
130            
131      private String getNextJobID() {
132        nextJobID += 1;
133        return this.groupName + this.nextJobID;
134      }
135    
136      /**
137       * Add a new controlled job.
138       * @param aJob the new controlled job
139       */
140      synchronized public String addJob(ControlledJob aJob) {
141        String id = this.getNextJobID();
142        aJob.setJobID(id);
143        aJob.setJobState(State.WAITING);
144        jobsInProgress.add(aJob);
145        return id;  
146      }
147    
148      /**
149       * Add a new job.
150       * @param aJob the new job
151       */
152      synchronized public String addJob(Job aJob) {
153        return addJob((ControlledJob) aJob);
154      }
155    
156      /**
157       * Add a collection of jobs
158       * 
159       * @param jobs
160       */
161      public void addJobCollection(Collection<ControlledJob> jobs) {
162        for (ControlledJob job : jobs) {
163          addJob(job);
164        }
165      }
166            
167      /**
168       * @return the thread state
169       */
170      public ThreadState getThreadState() {
171        return this.runnerState;
172      }
173            
174      /**
175       * set the thread state to STOPPING so that the 
176       * thread will stop when it wakes up.
177       */
178      public void stop() {
179        this.runnerState = ThreadState.STOPPING;
180      }
181            
182      /**
183       * suspend the running thread
184       */
185      public void suspend () {
186        if (this.runnerState == ThreadState.RUNNING) {
187          this.runnerState = ThreadState.SUSPENDED;
188        }
189      }
190            
191      /**
192       * resume the suspended thread
193       */
194      public void resume () {
195        if (this.runnerState == ThreadState.SUSPENDED) {
196          this.runnerState = ThreadState.RUNNING;
197        }
198      }
199            
200      synchronized public boolean allFinished() {
201        return jobsInProgress.isEmpty();
202      }
203            
204      /**
205       *  The main loop for the thread.
206       *  The loop does the following:
207       *    Check the states of the running jobs
208       *    Update the states of waiting jobs
209       *    Submit the jobs in ready state
210       */
211      public void run() {
212        try {
213          this.runnerState = ThreadState.RUNNING;
214          while (true) {
215            while (this.runnerState == ThreadState.SUSPENDED) {
216              try {
217                Thread.sleep(5000);
218              }
219              catch (Exception e) {
220                //TODO the thread was interrupted, do something!!!
221              }
222            }
223            
224            synchronized(this) {
225              Iterator<ControlledJob> it = jobsInProgress.iterator();
226              while(it.hasNext()) {
227                ControlledJob j = it.next();
228                LOG.debug("Checking state of job "+j);
229                switch(j.checkState()) {
230                case SUCCESS:
231                  successfulJobs.add(j);
232                  it.remove();
233                  break;
234                case FAILED:
235                case DEPENDENT_FAILED:
236                  failedJobs.add(j);
237                  it.remove();
238                  break;
239                case READY:
240                  j.submit();
241                  break;
242                case RUNNING:
243                case WAITING:
244                  //Do Nothing
245                  break;
246                }
247              }
248            }
249            
250            if (this.runnerState != ThreadState.RUNNING && 
251                this.runnerState != ThreadState.SUSPENDED) {
252              break;
253            }
254            try {
255              Thread.sleep(5000);
256            }
257            catch (Exception e) {
258              //TODO the thread was interrupted, do something!!!
259            }
260            if (this.runnerState != ThreadState.RUNNING && 
261                this.runnerState != ThreadState.SUSPENDED) {
262              break;
263            }
264          }
265        }catch(Throwable t) {
266          LOG.error("Error while trying to run jobs.",t);
267          //Mark all jobs as failed because we got something bad.
268          failAllJobs(t);
269        }
270        this.runnerState = ThreadState.STOPPED;
271      }
272    
273      synchronized private void failAllJobs(Throwable t) {
274        String message = "Unexpected System Error Occured: "+
275        StringUtils.stringifyException(t);
276        Iterator<ControlledJob> it = jobsInProgress.iterator();
277        while(it.hasNext()) {
278          ControlledJob j = it.next();
279          try {
280            j.failJob(message);
281          } catch (IOException e) {
282            LOG.error("Error while tyring to clean up "+j.getJobName(), e);
283          } catch (InterruptedException e) {
284            LOG.error("Error while tyring to clean up "+j.getJobName(), e);
285          } finally {
286            failedJobs.add(j);
287            it.remove();
288          }
289        }
290      }
291    }