001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.jobcontrol;
020
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.mapreduce.Job;
034import org.apache.hadoop.mapreduce.JobID;
035import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
036import org.apache.hadoop.util.StringUtils;
037
038/** 
039 *  This class encapsulates a MapReduce job and its dependency. It monitors 
040 *  the states of the depending jobs and updates the state of this job.
041 *  A job starts in the WAITING state. If it does not have any depending jobs,
042 *  or all of the depending jobs are in SUCCESS state, then the job state 
043 *  will become READY. If any depending jobs fail, the job will fail too. 
044 *  When in READY state, the job can be submitted to Hadoop for execution, with
045 *  the state changing into RUNNING state. From RUNNING state, the job 
046 *  can get into SUCCESS or FAILED state, depending 
047 *  the status of the job execution.
048 */
049@InterfaceAudience.Public
050@InterfaceStability.Evolving
051public class ControlledJob {
052  private static final Log LOG = LogFactory.getLog(ControlledJob.class);
053
054  // A job will be in one of the following states
055  public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED,
056                            DEPENDENT_FAILED}; 
057  public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
058  private State state;
059  private String controlID;     // assigned and used by JobControl class
060  private Job job;               // mapreduce job to be executed.
061  // some info for human consumption, e.g. the reason why the job failed
062  private String message;
063  // the jobs the current job depends on
064  private List<ControlledJob> dependingJobs;
065        
066  /** 
067   * Construct a job.
068   * @param job a mapreduce job to be executed.
069   * @param dependingJobs an array of jobs the current job depends on
070   */
071  public ControlledJob(Job job, List<ControlledJob> dependingJobs) 
072      throws IOException {
073    this.job = job;
074    this.dependingJobs = dependingJobs;
075    this.state = State.WAITING;
076    this.controlID = "unassigned";
077    this.message = "just initialized";
078  }
079  
080  /**
081   * Construct a job.
082   * 
083   * @param conf mapred job configuration representing a job to be executed.
084   * @throws IOException
085   */
086  public ControlledJob(Configuration conf) throws IOException {
087    this(new Job(conf), null);
088  }
089        
090  @Override
091  public String toString() {
092    StringBuffer sb = new StringBuffer();
093    sb.append("job name:\t").append(this.job.getJobName()).append("\n");
094    sb.append("job id:\t").append(this.controlID).append("\n");
095    sb.append("job state:\t").append(this.state).append("\n");
096    sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
097    sb.append("job message:\t").append(this.message).append("\n");
098                
099    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
100      sb.append("job has no depending job:\t").append("\n");
101    } else {
102      sb.append("job has ").append(this.dependingJobs.size()).
103         append(" dependeng jobs:\n");
104      for (int i = 0; i < this.dependingJobs.size(); i++) {
105        sb.append("\t depending job ").append(i).append(":\t");
106        sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
107      }
108    }
109    return sb.toString();
110  }
111        
112  /**
113   * @return the job name of this job
114   */
115  public String getJobName() {
116    return job.getJobName();
117  }
118        
119  /**
120   * Set the job name for  this job.
121   * @param jobName the job name
122   */
123  public void setJobName(String jobName) {
124    job.setJobName(jobName);
125  }
126        
127  /**
128   * @return the job ID of this job assigned by JobControl
129   */
130  public String getJobID() {
131    return this.controlID;
132  }
133        
134  /**
135   * Set the job ID for  this job.
136   * @param id the job ID
137   */
138  public void setJobID(String id) {
139    this.controlID = id;
140  }
141        
142  /**
143   * @return the mapred ID of this job as assigned by the 
144   * mapred framework.
145   */
146  public JobID getMapredJobID() {
147    return this.job.getJobID();
148  }
149  
150  /**
151   * @return the mapreduce job 
152   */
153  public synchronized Job getJob() {
154    return this.job;
155  }
156
157  /**
158   * Set the mapreduce job
159   * @param job the mapreduce job for this job.
160   */
161  public synchronized void setJob(Job job) {
162    this.job = job;
163  }
164
165  /**
166   * @return the state of this job
167   */
168  public synchronized State getJobState() {
169    return this.state;
170  }
171        
172  /**
173   * Set the state for this job.
174   * @param state the new state for this job.
175   */
176  protected synchronized void setJobState(State state) {
177    this.state = state;
178  }
179        
180  /**
181   * @return the message of this job
182   */
183  public synchronized String getMessage() {
184    return this.message;
185  }
186
187  /**
188   * Set the message for this job.
189   * @param message the message for this job.
190   */
191  public synchronized void setMessage(String message) {
192    this.message = message;
193  }
194
195  /**
196   * @return the depending jobs of this job
197   */
198  public List<ControlledJob> getDependentJobs() {
199    return this.dependingJobs;
200  }
201  
202  /**
203   * Add a job to this jobs' dependency list. 
204   * Dependent jobs can only be added while a Job 
205   * is waiting to run, not during or afterwards.
206   * 
207   * @param dependingJob Job that this Job depends on.
208   * @return <tt>true</tt> if the Job was added.
209   */
210  public synchronized boolean addDependingJob(ControlledJob dependingJob) {
211    if (this.state == State.WAITING) { //only allowed to add jobs when waiting
212      if (this.dependingJobs == null) {
213        this.dependingJobs = new ArrayList<ControlledJob>();
214      }
215      return this.dependingJobs.add(dependingJob);
216    } else {
217      return false;
218    }
219  }
220        
221  /**
222   * @return true if this job is in a complete state
223   */
224  public synchronized boolean isCompleted() {
225    return this.state == State.FAILED || 
226      this.state == State.DEPENDENT_FAILED ||
227      this.state == State.SUCCESS;
228  }
229        
230  /**
231   * @return true if this job is in READY state
232   */
233  public synchronized boolean isReady() {
234    return this.state == State.READY;
235  }
236
237  public void killJob() throws IOException, InterruptedException {
238    job.killJob();
239  }
240  
241  public synchronized void failJob(String message) throws IOException, InterruptedException {
242    try {
243      if(job != null && this.state == State.RUNNING) {
244        job.killJob();
245      }
246    } finally {
247      this.state = State.FAILED;
248      this.message = message;
249    }
250  }
251  
252  /**
253   * Check the state of this running job. The state may 
254   * remain the same, become SUCCESS or FAILED.
255   */
256  private void checkRunningState() throws IOException, InterruptedException {
257    try {
258      if (job.isComplete()) {
259        if (job.isSuccessful()) {
260          this.state = State.SUCCESS;
261        } else {
262          this.state = State.FAILED;
263          this.message = "Job failed!";
264        }
265      }
266    } catch (IOException ioe) {
267      this.state = State.FAILED;
268      this.message = StringUtils.stringifyException(ioe);
269      try {
270        if (job != null) {
271          job.killJob();
272        }
273      } catch (IOException e) {}
274    }
275  }
276        
277  /**
278   * Check and update the state of this job. The state changes  
279   * depending on its current state and the states of the depending jobs.
280   */
281   synchronized State checkState() throws IOException, InterruptedException {
282    if (this.state == State.RUNNING) {
283      checkRunningState();
284    }
285    if (this.state != State.WAITING) {
286      return this.state;
287    }
288    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
289      this.state = State.READY;
290      return this.state;
291    }
292    ControlledJob pred = null;
293    int n = this.dependingJobs.size();
294    for (int i = 0; i < n; i++) {
295      pred = this.dependingJobs.get(i);
296      State s = pred.checkState();
297      if (s == State.WAITING || s == State.READY || s == State.RUNNING) {
298        break; // a pred is still not completed, continue in WAITING
299        // state
300      }
301      if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
302        this.state = State.DEPENDENT_FAILED;
303        this.message = "depending job " + i + " with jobID "
304          + pred.getJobID() + " failed. " + pred.getMessage();
305        break;
306      }
307      // pred must be in success state
308      if (i == n - 1) {
309        this.state = State.READY;
310      }
311    }
312
313    return this.state;
314  }
315        
316  /**
317   * Submit this job to mapred. The state becomes RUNNING if submission 
318   * is successful, FAILED otherwise.  
319   */
320  protected synchronized void submit() {
321    try {
322      Configuration conf = job.getConfiguration();
323      if (conf.getBoolean(CREATE_DIR, false)) {
324        FileSystem fs = FileSystem.get(conf);
325        Path inputPaths[] = FileInputFormat.getInputPaths(job);
326        for (int i = 0; i < inputPaths.length; i++) {
327          if (!fs.exists(inputPaths[i])) {
328            try {
329              fs.mkdirs(inputPaths[i]);
330            } catch (IOException e) {
331
332            }
333          }
334        }
335      }
336      job.submit();
337      this.state = State.RUNNING;
338    } catch (Exception ioe) {
339      LOG.info(getJobName()+" got an error while submitting ",ioe);
340      this.state = State.FAILED;
341      this.message = StringUtils.stringifyException(ioe);
342    }
343  }
344        
345}