001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.jobcontrol;
020    
021    
022    import java.io.IOException;
023    import java.util.ArrayList;
024    import java.util.List;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.fs.FileSystem;
032    import org.apache.hadoop.fs.Path;
033    import org.apache.hadoop.mapreduce.Job;
034    import org.apache.hadoop.mapreduce.JobID;
035    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
036    import org.apache.hadoop.util.StringUtils;
037    
038    /** 
039     *  This class encapsulates a MapReduce job and its dependency. It monitors 
040     *  the states of the depending jobs and updates the state of this job.
041     *  A job starts in the WAITING state. If it does not have any depending jobs,
042     *  or all of the depending jobs are in SUCCESS state, then the job state 
043     *  will become READY. If any depending jobs fail, the job will fail too. 
044     *  When in READY state, the job can be submitted to Hadoop for execution, with
045     *  the state changing into RUNNING state. From RUNNING state, the job 
046     *  can get into SUCCESS or FAILED state, depending 
047     *  the status of the job execution.
048     */
049    @InterfaceAudience.Public
050    @InterfaceStability.Evolving
051    public class ControlledJob {
052      private static final Log LOG = LogFactory.getLog(ControlledJob.class);
053    
054      // A job will be in one of the following states
055      public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED,
056                                DEPENDENT_FAILED}; 
057      public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
058      private State state;
059      private String controlID;     // assigned and used by JobControl class
060      private Job job;               // mapreduce job to be executed.
061      // some info for human consumption, e.g. the reason why the job failed
062      private String message;
063      // the jobs the current job depends on
064      private List<ControlledJob> dependingJobs;
065            
066      /** 
067       * Construct a job.
068       * @param job a mapreduce job to be executed.
069       * @param dependingJobs an array of jobs the current job depends on
070       */
071      public ControlledJob(Job job, List<ControlledJob> dependingJobs) 
072          throws IOException {
073        this.job = job;
074        this.dependingJobs = dependingJobs;
075        this.state = State.WAITING;
076        this.controlID = "unassigned";
077        this.message = "just initialized";
078      }
079      
080      /**
081       * Construct a job.
082       * 
083       * @param conf mapred job configuration representing a job to be executed.
084       * @throws IOException
085       */
086      public ControlledJob(Configuration conf) throws IOException {
087        this(new Job(conf), null);
088      }
089            
090      @Override
091      public String toString() {
092        StringBuffer sb = new StringBuffer();
093        sb.append("job name:\t").append(this.job.getJobName()).append("\n");
094        sb.append("job id:\t").append(this.controlID).append("\n");
095        sb.append("job state:\t").append(this.state).append("\n");
096        sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
097        sb.append("job message:\t").append(this.message).append("\n");
098                    
099        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
100          sb.append("job has no depending job:\t").append("\n");
101        } else {
102          sb.append("job has ").append(this.dependingJobs.size()).
103             append(" dependeng jobs:\n");
104          for (int i = 0; i < this.dependingJobs.size(); i++) {
105            sb.append("\t depending job ").append(i).append(":\t");
106            sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
107          }
108        }
109        return sb.toString();
110      }
111            
112      /**
113       * @return the job name of this job
114       */
115      public String getJobName() {
116        return job.getJobName();
117      }
118            
119      /**
120       * Set the job name for  this job.
121       * @param jobName the job name
122       */
123      public void setJobName(String jobName) {
124        job.setJobName(jobName);
125      }
126            
127      /**
128       * @return the job ID of this job assigned by JobControl
129       */
130      public String getJobID() {
131        return this.controlID;
132      }
133            
134      /**
135       * Set the job ID for  this job.
136       * @param id the job ID
137       */
138      public void setJobID(String id) {
139        this.controlID = id;
140      }
141    
142      /**
143       * @return the mapred ID of this job as assigned by the mapred framework.
144       */
145      public synchronized JobID getMapredJobId() {
146        return this.job.getJobID();
147      }
148      
149      /**
150       * @return the mapreduce job 
151       */
152      public synchronized Job getJob() {
153        return this.job;
154      }
155    
156      /**
157       * Set the mapreduce job
158       * @param job the mapreduce job for this job.
159       */
160      public synchronized void setJob(Job job) {
161        this.job = job;
162      }
163    
164      /**
165       * @return the state of this job
166       */
167      public synchronized State getJobState() {
168        return this.state;
169      }
170            
171      /**
172       * Set the state for this job.
173       * @param state the new state for this job.
174       */
175      protected synchronized void setJobState(State state) {
176        this.state = state;
177      }
178            
179      /**
180       * @return the message of this job
181       */
182      public synchronized String getMessage() {
183        return this.message;
184      }
185    
186      /**
187       * Set the message for this job.
188       * @param message the message for this job.
189       */
190      public synchronized void setMessage(String message) {
191        this.message = message;
192      }
193    
194      /**
195       * @return the depending jobs of this job
196       */
197      public List<ControlledJob> getDependentJobs() {
198        return this.dependingJobs;
199      }
200      
201      /**
202       * Add a job to this jobs' dependency list. 
203       * Dependent jobs can only be added while a Job 
204       * is waiting to run, not during or afterwards.
205       * 
206       * @param dependingJob Job that this Job depends on.
207       * @return <tt>true</tt> if the Job was added.
208       */
209      public synchronized boolean addDependingJob(ControlledJob dependingJob) {
210        if (this.state == State.WAITING) { //only allowed to add jobs when waiting
211          if (this.dependingJobs == null) {
212            this.dependingJobs = new ArrayList<ControlledJob>();
213          }
214          return this.dependingJobs.add(dependingJob);
215        } else {
216          return false;
217        }
218      }
219            
220      /**
221       * @return true if this job is in a complete state
222       */
223      public synchronized boolean isCompleted() {
224        return this.state == State.FAILED || 
225          this.state == State.DEPENDENT_FAILED ||
226          this.state == State.SUCCESS;
227      }
228            
229      /**
230       * @return true if this job is in READY state
231       */
232      public synchronized boolean isReady() {
233        return this.state == State.READY;
234      }
235    
236      public void killJob() throws IOException, InterruptedException {
237        job.killJob();
238      }
239      
240      public synchronized void failJob(String message) throws IOException, InterruptedException {
241        try {
242          if(job != null && this.state == State.RUNNING) {
243            job.killJob();
244          }
245        } finally {
246          this.state = State.FAILED;
247          this.message = message;
248        }
249      }
250      
251      /**
252       * Check the state of this running job. The state may 
253       * remain the same, become SUCCESS or FAILED.
254       */
255      private void checkRunningState() throws IOException, InterruptedException {
256        try {
257          if (job.isComplete()) {
258            if (job.isSuccessful()) {
259              this.state = State.SUCCESS;
260            } else {
261              this.state = State.FAILED;
262              this.message = "Job failed!";
263            }
264          }
265        } catch (IOException ioe) {
266          this.state = State.FAILED;
267          this.message = StringUtils.stringifyException(ioe);
268          try {
269            if (job != null) {
270              job.killJob();
271            }
272          } catch (IOException e) {}
273        }
274      }
275            
276      /**
277       * Check and update the state of this job. The state changes  
278       * depending on its current state and the states of the depending jobs.
279       */
280       synchronized State checkState() throws IOException, InterruptedException {
281        if (this.state == State.RUNNING) {
282          checkRunningState();
283        }
284        if (this.state != State.WAITING) {
285          return this.state;
286        }
287        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
288          this.state = State.READY;
289          return this.state;
290        }
291        ControlledJob pred = null;
292        int n = this.dependingJobs.size();
293        for (int i = 0; i < n; i++) {
294          pred = this.dependingJobs.get(i);
295          State s = pred.checkState();
296          if (s == State.WAITING || s == State.READY || s == State.RUNNING) {
297            break; // a pred is still not completed, continue in WAITING
298            // state
299          }
300          if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
301            this.state = State.DEPENDENT_FAILED;
302            this.message = "depending job " + i + " with jobID "
303              + pred.getJobID() + " failed. " + pred.getMessage();
304            break;
305          }
306          // pred must be in success state
307          if (i == n - 1) {
308            this.state = State.READY;
309          }
310        }
311    
312        return this.state;
313      }
314            
315      /**
316       * Submit this job to mapred. The state becomes RUNNING if submission 
317       * is successful, FAILED otherwise.  
318       */
319      protected synchronized void submit() {
320        try {
321          Configuration conf = job.getConfiguration();
322          if (conf.getBoolean(CREATE_DIR, false)) {
323            FileSystem fs = FileSystem.get(conf);
324            Path inputPaths[] = FileInputFormat.getInputPaths(job);
325            for (int i = 0; i < inputPaths.length; i++) {
326              if (!fs.exists(inputPaths[i])) {
327                try {
328                  fs.mkdirs(inputPaths[i]);
329                } catch (IOException e) {
330    
331                }
332              }
333            }
334          }
335          job.submit();
336          this.state = State.RUNNING;
337        } catch (Exception ioe) {
338          LOG.info(getJobName()+" got an error while submitting ",ioe);
339          this.state = State.FAILED;
340          this.message = StringUtils.stringifyException(ioe);
341        }
342      }
343            
344    }