001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.jobcontrol;
020
021
022 import java.io.IOException;
023 import java.util.ArrayList;
024 import java.util.List;
025
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.apache.hadoop.classification.InterfaceAudience;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.fs.FileSystem;
032 import org.apache.hadoop.fs.Path;
033 import org.apache.hadoop.mapreduce.Job;
034 import org.apache.hadoop.mapreduce.JobID;
035 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
036 import org.apache.hadoop.util.StringUtils;
037
038 /**
039 * This class encapsulates a MapReduce job and its dependency. It monitors
040 * the states of the depending jobs and updates the state of this job.
041 * A job starts in the WAITING state. If it does not have any depending jobs,
042 * or all of the depending jobs are in SUCCESS state, then the job state
043 * will become READY. If any depending jobs fail, the job will fail too.
044 * When in READY state, the job can be submitted to Hadoop for execution, with
045 * the state changing into RUNNING state. From RUNNING state, the job
046 * can get into SUCCESS or FAILED state, depending
047 * the status of the job execution.
048 */
049 @InterfaceAudience.Public
050 @InterfaceStability.Evolving
051 public class ControlledJob {
052 private static final Log LOG = LogFactory.getLog(ControlledJob.class);
053
054 // A job will be in one of the following states
055 public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED,
056 DEPENDENT_FAILED};
057 public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
058 private State state;
059 private String controlID; // assigned and used by JobControl class
060 private Job job; // mapreduce job to be executed.
061 // some info for human consumption, e.g. the reason why the job failed
062 private String message;
063 // the jobs the current job depends on
064 private List<ControlledJob> dependingJobs;
065
066 /**
067 * Construct a job.
068 * @param job a mapreduce job to be executed.
069 * @param dependingJobs an array of jobs the current job depends on
070 */
071 public ControlledJob(Job job, List<ControlledJob> dependingJobs)
072 throws IOException {
073 this.job = job;
074 this.dependingJobs = dependingJobs;
075 this.state = State.WAITING;
076 this.controlID = "unassigned";
077 this.message = "just initialized";
078 }
079
080 /**
081 * Construct a job.
082 *
083 * @param conf mapred job configuration representing a job to be executed.
084 * @throws IOException
085 */
086 public ControlledJob(Configuration conf) throws IOException {
087 this(new Job(conf), null);
088 }
089
090 @Override
091 public String toString() {
092 StringBuffer sb = new StringBuffer();
093 sb.append("job name:\t").append(this.job.getJobName()).append("\n");
094 sb.append("job id:\t").append(this.controlID).append("\n");
095 sb.append("job state:\t").append(this.state).append("\n");
096 sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
097 sb.append("job message:\t").append(this.message).append("\n");
098
099 if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
100 sb.append("job has no depending job:\t").append("\n");
101 } else {
102 sb.append("job has ").append(this.dependingJobs.size()).
103 append(" dependeng jobs:\n");
104 for (int i = 0; i < this.dependingJobs.size(); i++) {
105 sb.append("\t depending job ").append(i).append(":\t");
106 sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
107 }
108 }
109 return sb.toString();
110 }
111
112 /**
113 * @return the job name of this job
114 */
115 public String getJobName() {
116 return job.getJobName();
117 }
118
119 /**
120 * Set the job name for this job.
121 * @param jobName the job name
122 */
123 public void setJobName(String jobName) {
124 job.setJobName(jobName);
125 }
126
127 /**
128 * @return the job ID of this job assigned by JobControl
129 */
130 public String getJobID() {
131 return this.controlID;
132 }
133
134 /**
135 * Set the job ID for this job.
136 * @param id the job ID
137 */
138 public void setJobID(String id) {
139 this.controlID = id;
140 }
141
142 /**
143 * @return the mapred ID of this job as assigned by the mapred framework.
144 */
145 public synchronized JobID getMapredJobId() {
146 return this.job.getJobID();
147 }
148
149 /**
150 * @return the mapreduce job
151 */
152 public synchronized Job getJob() {
153 return this.job;
154 }
155
156 /**
157 * Set the mapreduce job
158 * @param job the mapreduce job for this job.
159 */
160 public synchronized void setJob(Job job) {
161 this.job = job;
162 }
163
164 /**
165 * @return the state of this job
166 */
167 public synchronized State getJobState() {
168 return this.state;
169 }
170
171 /**
172 * Set the state for this job.
173 * @param state the new state for this job.
174 */
175 protected synchronized void setJobState(State state) {
176 this.state = state;
177 }
178
179 /**
180 * @return the message of this job
181 */
182 public synchronized String getMessage() {
183 return this.message;
184 }
185
186 /**
187 * Set the message for this job.
188 * @param message the message for this job.
189 */
190 public synchronized void setMessage(String message) {
191 this.message = message;
192 }
193
194 /**
195 * @return the depending jobs of this job
196 */
197 public List<ControlledJob> getDependentJobs() {
198 return this.dependingJobs;
199 }
200
201 /**
202 * Add a job to this jobs' dependency list.
203 * Dependent jobs can only be added while a Job
204 * is waiting to run, not during or afterwards.
205 *
206 * @param dependingJob Job that this Job depends on.
207 * @return <tt>true</tt> if the Job was added.
208 */
209 public synchronized boolean addDependingJob(ControlledJob dependingJob) {
210 if (this.state == State.WAITING) { //only allowed to add jobs when waiting
211 if (this.dependingJobs == null) {
212 this.dependingJobs = new ArrayList<ControlledJob>();
213 }
214 return this.dependingJobs.add(dependingJob);
215 } else {
216 return false;
217 }
218 }
219
220 /**
221 * @return true if this job is in a complete state
222 */
223 public synchronized boolean isCompleted() {
224 return this.state == State.FAILED ||
225 this.state == State.DEPENDENT_FAILED ||
226 this.state == State.SUCCESS;
227 }
228
229 /**
230 * @return true if this job is in READY state
231 */
232 public synchronized boolean isReady() {
233 return this.state == State.READY;
234 }
235
236 public void killJob() throws IOException, InterruptedException {
237 job.killJob();
238 }
239
240 public synchronized void failJob(String message) throws IOException, InterruptedException {
241 try {
242 if(job != null && this.state == State.RUNNING) {
243 job.killJob();
244 }
245 } finally {
246 this.state = State.FAILED;
247 this.message = message;
248 }
249 }
250
251 /**
252 * Check the state of this running job. The state may
253 * remain the same, become SUCCESS or FAILED.
254 */
255 private void checkRunningState() throws IOException, InterruptedException {
256 try {
257 if (job.isComplete()) {
258 if (job.isSuccessful()) {
259 this.state = State.SUCCESS;
260 } else {
261 this.state = State.FAILED;
262 this.message = "Job failed!";
263 }
264 }
265 } catch (IOException ioe) {
266 this.state = State.FAILED;
267 this.message = StringUtils.stringifyException(ioe);
268 try {
269 if (job != null) {
270 job.killJob();
271 }
272 } catch (IOException e) {}
273 }
274 }
275
276 /**
277 * Check and update the state of this job. The state changes
278 * depending on its current state and the states of the depending jobs.
279 */
280 synchronized State checkState() throws IOException, InterruptedException {
281 if (this.state == State.RUNNING) {
282 checkRunningState();
283 }
284 if (this.state != State.WAITING) {
285 return this.state;
286 }
287 if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
288 this.state = State.READY;
289 return this.state;
290 }
291 ControlledJob pred = null;
292 int n = this.dependingJobs.size();
293 for (int i = 0; i < n; i++) {
294 pred = this.dependingJobs.get(i);
295 State s = pred.checkState();
296 if (s == State.WAITING || s == State.READY || s == State.RUNNING) {
297 break; // a pred is still not completed, continue in WAITING
298 // state
299 }
300 if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
301 this.state = State.DEPENDENT_FAILED;
302 this.message = "depending job " + i + " with jobID "
303 + pred.getJobID() + " failed. " + pred.getMessage();
304 break;
305 }
306 // pred must be in success state
307 if (i == n - 1) {
308 this.state = State.READY;
309 }
310 }
311
312 return this.state;
313 }
314
315 /**
316 * Submit this job to mapred. The state becomes RUNNING if submission
317 * is successful, FAILED otherwise.
318 */
319 protected synchronized void submit() {
320 try {
321 Configuration conf = job.getConfiguration();
322 if (conf.getBoolean(CREATE_DIR, false)) {
323 FileSystem fs = FileSystem.get(conf);
324 Path inputPaths[] = FileInputFormat.getInputPaths(job);
325 for (int i = 0; i < inputPaths.length; i++) {
326 if (!fs.exists(inputPaths[i])) {
327 try {
328 fs.mkdirs(inputPaths[i]);
329 } catch (IOException e) {
330
331 }
332 }
333 }
334 }
335 job.submit();
336 this.state = State.RUNNING;
337 } catch (Exception ioe) {
338 LOG.info(getJobName()+" got an error while submitting ",ioe);
339 this.state = State.FAILED;
340 this.message = StringUtils.stringifyException(ioe);
341 }
342 }
343
344 }