001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.jobcontrol;
020
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.mapreduce.Job;
034import org.apache.hadoop.mapreduce.JobID;
035import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
036import org.apache.hadoop.util.StringUtils;
037
038/** 
039 *  This class encapsulates a MapReduce job and its dependency. It monitors 
040 *  the states of the depending jobs and updates the state of this job.
041 *  A job starts in the WAITING state. If it does not have any depending jobs,
042 *  or all of the depending jobs are in SUCCESS state, then the job state 
043 *  will become READY. If any depending jobs fail, the job will fail too. 
044 *  When in READY state, the job can be submitted to Hadoop for execution, with
045 *  the state changing into RUNNING state. From RUNNING state, the job 
046 *  can get into SUCCESS or FAILED state, depending 
047 *  the status of the job execution.
048 */
049@InterfaceAudience.Public
050@InterfaceStability.Evolving
051public class ControlledJob {
052  private static final Log LOG = LogFactory.getLog(ControlledJob.class);
053
054  // A job will be in one of the following states
055  public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED,
056                            DEPENDENT_FAILED}; 
057  public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
058  private State state;
059  private String controlID;     // assigned and used by JobControl class
060  private Job job;               // mapreduce job to be executed.
061  // some info for human consumption, e.g. the reason why the job failed
062  private String message;
063  // the jobs the current job depends on
064  private List<ControlledJob> dependingJobs;
065        
066  /** 
067   * Construct a job.
068   * @param job a mapreduce job to be executed.
069   * @param dependingJobs an array of jobs the current job depends on
070   */
071  public ControlledJob(Job job, List<ControlledJob> dependingJobs) 
072      throws IOException {
073    this.job = job;
074    this.dependingJobs = dependingJobs;
075    this.state = State.WAITING;
076    this.controlID = "unassigned";
077    this.message = "just initialized";
078  }
079  
080  /**
081   * Construct a job.
082   * 
083   * @param conf mapred job configuration representing a job to be executed.
084   * @throws IOException
085   */
086  public ControlledJob(Configuration conf) throws IOException {
087    this(Job.getInstance(conf), null);
088  }
089        
090  @Override
091  public String toString() {
092    StringBuffer sb = new StringBuffer();
093    sb.append("job name:\t").append(this.job.getJobName()).append("\n");
094    sb.append("job id:\t").append(this.controlID).append("\n");
095    sb.append("job state:\t").append(this.state).append("\n");
096    sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
097    sb.append("job message:\t").append(this.message).append("\n");
098                
099    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
100      sb.append("job has no depending job:\t").append("\n");
101    } else {
102      sb.append("job has ").append(this.dependingJobs.size()).
103         append(" dependeng jobs:\n");
104      for (int i = 0; i < this.dependingJobs.size(); i++) {
105        sb.append("\t depending job ").append(i).append(":\t");
106        sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
107      }
108    }
109    return sb.toString();
110  }
111        
112  /**
113   * @return the job name of this job
114   */
115  public String getJobName() {
116    return job.getJobName();
117  }
118        
119  /**
120   * Set the job name for  this job.
121   * @param jobName the job name
122   */
123  public void setJobName(String jobName) {
124    job.setJobName(jobName);
125  }
126        
127  /**
128   * @return the job ID of this job assigned by JobControl
129   */
130  public String getJobID() {
131    return this.controlID;
132  }
133        
134  /**
135   * Set the job ID for  this job.
136   * @param id the job ID
137   */
138  public void setJobID(String id) {
139    this.controlID = id;
140  }
141
142  /**
143   * @return the mapred ID of this job as assigned by the mapred framework.
144   */
145  public synchronized JobID getMapredJobId() {
146    return this.job.getJobID();
147  }
148  
149  /**
150   * @return the mapreduce job 
151   */
152  public synchronized Job getJob() {
153    return this.job;
154  }
155
156  /**
157   * Set the mapreduce job
158   * @param job the mapreduce job for this job.
159   */
160  public synchronized void setJob(Job job) {
161    this.job = job;
162  }
163
164  /**
165   * @return the state of this job
166   */
167  public synchronized State getJobState() {
168    return this.state;
169  }
170        
171  /**
172   * Set the state for this job.
173   * @param state the new state for this job.
174   */
175  protected synchronized void setJobState(State state) {
176    this.state = state;
177  }
178        
179  /**
180   * @return the message of this job
181   */
182  public synchronized String getMessage() {
183    return this.message;
184  }
185
186  /**
187   * Set the message for this job.
188   * @param message the message for this job.
189   */
190  public synchronized void setMessage(String message) {
191    this.message = message;
192  }
193
194  /**
195   * @return the depending jobs of this job
196   */
197  public List<ControlledJob> getDependentJobs() {
198    return this.dependingJobs;
199  }
200  
201  /**
202   * Add a job to this jobs' dependency list. 
203   * Dependent jobs can only be added while a Job 
204   * is waiting to run, not during or afterwards.
205   * 
206   * @param dependingJob Job that this Job depends on.
207   * @return <tt>true</tt> if the Job was added.
208   */
209  public synchronized boolean addDependingJob(ControlledJob dependingJob) {
210    if (this.state == State.WAITING) { //only allowed to add jobs when waiting
211      if (this.dependingJobs == null) {
212        this.dependingJobs = new ArrayList<ControlledJob>();
213      }
214      return this.dependingJobs.add(dependingJob);
215    } else {
216      return false;
217    }
218  }
219        
220  /**
221   * @return true if this job is in a complete state
222   */
223  public synchronized boolean isCompleted() {
224    return this.state == State.FAILED || 
225      this.state == State.DEPENDENT_FAILED ||
226      this.state == State.SUCCESS;
227  }
228        
229  /**
230   * @return true if this job is in READY state
231   */
232  public synchronized boolean isReady() {
233    return this.state == State.READY;
234  }
235
236  public void killJob() throws IOException, InterruptedException {
237    job.killJob();
238  }
239  
240  public synchronized void failJob(String message) throws IOException, InterruptedException {
241    try {
242      if(job != null && this.state == State.RUNNING) {
243        job.killJob();
244      }
245    } finally {
246      this.state = State.FAILED;
247      this.message = message;
248    }
249  }
250  
251  /**
252   * Check the state of this running job. The state may 
253   * remain the same, become SUCCESS or FAILED.
254   */
255  private void checkRunningState() throws IOException, InterruptedException {
256    try {
257      if (job.isComplete()) {
258        if (job.isSuccessful()) {
259          this.state = State.SUCCESS;
260        } else {
261          this.state = State.FAILED;
262          this.message = "Job failed!";
263        }
264      }
265    } catch (IOException ioe) {
266      this.state = State.FAILED;
267      this.message = StringUtils.stringifyException(ioe);
268      try {
269        if (job != null) {
270          job.killJob();
271        }
272      } catch (IOException e) {}
273    }
274  }
275        
276  /**
277   * Check and update the state of this job. The state changes  
278   * depending on its current state and the states of the depending jobs.
279   */
280   synchronized State checkState() throws IOException, InterruptedException {
281    if (this.state == State.RUNNING) {
282      checkRunningState();
283    }
284    if (this.state != State.WAITING) {
285      return this.state;
286    }
287    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
288      this.state = State.READY;
289      return this.state;
290    }
291    ControlledJob pred = null;
292    int n = this.dependingJobs.size();
293    for (int i = 0; i < n; i++) {
294      pred = this.dependingJobs.get(i);
295      State s = pred.checkState();
296      if (s == State.WAITING || s == State.READY || s == State.RUNNING) {
297        break; // a pred is still not completed, continue in WAITING
298        // state
299      }
300      if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
301        this.state = State.DEPENDENT_FAILED;
302        this.message = "depending job " + i + " with jobID "
303          + pred.getJobID() + " failed. " + pred.getMessage();
304        break;
305      }
306      // pred must be in success state
307      if (i == n - 1) {
308        this.state = State.READY;
309      }
310    }
311
312    return this.state;
313  }
314        
315  /**
316   * Submit this job to mapred. The state becomes RUNNING if submission 
317   * is successful, FAILED otherwise.  
318   */
319  protected synchronized void submit() {
320    try {
321      Configuration conf = job.getConfiguration();
322      if (conf.getBoolean(CREATE_DIR, false)) {
323        FileSystem fs = FileSystem.get(conf);
324        Path inputPaths[] = FileInputFormat.getInputPaths(job);
325        for (int i = 0; i < inputPaths.length; i++) {
326          if (!fs.exists(inputPaths[i])) {
327            try {
328              fs.mkdirs(inputPaths[i]);
329            } catch (IOException e) {
330
331            }
332          }
333        }
334      }
335      job.submit();
336      this.state = State.RUNNING;
337    } catch (Exception ioe) {
338      LOG.info(getJobName()+" got an error while submitting ",ioe);
339      this.state = State.FAILED;
340      this.message = StringUtils.stringifyException(ioe);
341    }
342  }
343        
344}