001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce;
020
021 import java.io.IOException;
022 import java.net.URI;
023 import java.security.PrivilegedExceptionAction;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029 import org.apache.hadoop.classification.InterfaceAudience.Private;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.conf.Configuration.IntegerRanges;
032 import org.apache.hadoop.fs.FileSystem;
033 import org.apache.hadoop.fs.Path;
034 import org.apache.hadoop.io.RawComparator;
035 import org.apache.hadoop.mapred.JobConf;
036 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038 import org.apache.hadoop.mapreduce.task.JobContextImpl;
039 import org.apache.hadoop.mapreduce.util.ConfigUtil;
040 import org.apache.hadoop.util.StringUtils;
041 import org.apache.hadoop.yarn.api.records.ReservationId;
042
043 /**
044 * The job submitter's view of the Job.
045 *
046 * <p>It allows the user to configure the
047 * job, submit it, control its execution, and query the state. The set methods
048 * only work until the job is submitted, afterwards they will throw an
049 * IllegalStateException. </p>
050 *
051 * <p>
052 * Normally the user creates the application, describes various facets of the
053 * job via {@link Job} and then submits the job and monitor its progress.</p>
054 *
055 * <p>Here is an example on how to submit a job:</p>
056 * <p><blockquote><pre>
057 * // Create a new Job
058 * Job job = Job.getInstance();
059 * job.setJarByClass(MyJob.class);
060 *
061 * // Specify various job-specific parameters
062 * job.setJobName("myjob");
063 *
064 * job.setInputPath(new Path("in"));
065 * job.setOutputPath(new Path("out"));
066 *
067 * job.setMapperClass(MyJob.MyMapper.class);
068 * job.setReducerClass(MyJob.MyReducer.class);
069 *
070 * // Submit the job, then poll for progress until the job is complete
071 * job.waitForCompletion(true);
072 * </pre></blockquote></p>
073 *
074 *
075 */
076 @InterfaceAudience.Public
077 @InterfaceStability.Evolving
078 public class Job extends JobContextImpl implements JobContext {
079 private static final Log LOG = LogFactory.getLog(Job.class);
080
081 @InterfaceStability.Evolving
082 public static enum JobState {DEFINE, RUNNING};
083 private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
084 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
085 /** Key in mapred-*.xml that sets completionPollInvervalMillis */
086 public static final String COMPLETION_POLL_INTERVAL_KEY =
087 "mapreduce.client.completion.pollinterval";
088
089 /** Default completionPollIntervalMillis is 5000 ms. */
090 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
091 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
092 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
093 "mapreduce.client.progressmonitor.pollinterval";
094 /** Default progMonitorPollIntervalMillis is 1000 ms. */
095 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
096
097 public static final String USED_GENERIC_PARSER =
098 "mapreduce.client.genericoptionsparser.used";
099 public static final String SUBMIT_REPLICATION =
100 "mapreduce.client.submit.file.replication";
101 private static final String TASKLOG_PULL_TIMEOUT_KEY =
102 "mapreduce.client.tasklog.timeout";
103 private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
104
105 @InterfaceStability.Evolving
106 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
107
108 static {
109 ConfigUtil.loadResources();
110 }
111
112 private JobState state = JobState.DEFINE;
113 private JobStatus status;
114 private long statustime;
115 private Cluster cluster;
116 private ReservationId reservationId;
117
118 /**
119 * @deprecated Use {@link #getInstance()}
120 */
121 @Deprecated
122 public Job() throws IOException {
123 this(new Configuration());
124 }
125
126 /**
127 * @deprecated Use {@link #getInstance(Configuration)}
128 */
129 @Deprecated
130 public Job(Configuration conf) throws IOException {
131 this(new JobConf(conf));
132 }
133
134 /**
135 * @deprecated Use {@link #getInstance(Configuration, String)}
136 */
137 @Deprecated
138 public Job(Configuration conf, String jobName) throws IOException {
139 this(conf);
140 setJobName(jobName);
141 }
142
143 Job(JobConf conf) throws IOException {
144 super(conf, null);
145 // propagate existing user credentials to job
146 this.credentials.mergeAll(this.ugi.getCredentials());
147 this.cluster = null;
148 }
149
150 Job(JobStatus status, JobConf conf) throws IOException {
151 this(conf);
152 setJobID(status.getJobID());
153 this.status = status;
154 state = JobState.RUNNING;
155 }
156
157
158 /**
159 * Creates a new {@link Job} with no particular {@link Cluster} .
160 * A Cluster will be created with a generic {@link Configuration}.
161 *
162 * @return the {@link Job} , with no connection to a cluster yet.
163 * @throws IOException
164 */
165 public static Job getInstance() throws IOException {
166 // create with a null Cluster
167 return getInstance(new Configuration());
168 }
169
170 /**
171 * Creates a new {@link Job} with no particular {@link Cluster} and a
172 * given {@link Configuration}.
173 *
174 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
175 * that any necessary internal modifications do not reflect on the incoming
176 * parameter.
177 *
178 * A Cluster will be created from the conf parameter only when it's needed.
179 *
180 * @param conf the configuration
181 * @return the {@link Job} , with no connection to a cluster yet.
182 * @throws IOException
183 */
184 public static Job getInstance(Configuration conf) throws IOException {
185 // create with a null Cluster
186 JobConf jobConf = new JobConf(conf);
187 return new Job(jobConf);
188 }
189
190
191 /**
192 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
193 * A Cluster will be created from the conf parameter only when it's needed.
194 *
195 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
196 * that any necessary internal modifications do not reflect on the incoming
197 * parameter.
198 *
199 * @param conf the configuration
200 * @return the {@link Job} , with no connection to a cluster yet.
201 * @throws IOException
202 */
203 public static Job getInstance(Configuration conf, String jobName)
204 throws IOException {
205 // create with a null Cluster
206 Job result = getInstance(conf);
207 result.setJobName(jobName);
208 return result;
209 }
210
211 /**
212 * Creates a new {@link Job} with no particular {@link Cluster} and given
213 * {@link Configuration} and {@link JobStatus}.
214 * A Cluster will be created from the conf parameter only when it's needed.
215 *
216 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
217 * that any necessary internal modifications do not reflect on the incoming
218 * parameter.
219 *
220 * @param status job status
221 * @param conf job configuration
222 * @return the {@link Job} , with no connection to a cluster yet.
223 * @throws IOException
224 */
225 public static Job getInstance(JobStatus status, Configuration conf)
226 throws IOException {
227 return new Job(status, new JobConf(conf));
228 }
229
230 /**
231 * Creates a new {@link Job} with no particular {@link Cluster}.
232 * A Cluster will be created from the conf parameter only when it's needed.
233 *
234 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
235 * that any necessary internal modifications do not reflect on the incoming
236 * parameter.
237 *
238 * @param ignored
239 * @return the {@link Job} , with no connection to a cluster yet.
240 * @throws IOException
241 * @deprecated Use {@link #getInstance()}
242 */
243 @Deprecated
244 public static Job getInstance(Cluster ignored) throws IOException {
245 return getInstance();
246 }
247
248 /**
249 * Creates a new {@link Job} with no particular {@link Cluster} and given
250 * {@link Configuration}.
251 * A Cluster will be created from the conf parameter only when it's needed.
252 *
253 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
254 * that any necessary internal modifications do not reflect on the incoming
255 * parameter.
256 *
257 * @param ignored
258 * @param conf job configuration
259 * @return the {@link Job} , with no connection to a cluster yet.
260 * @throws IOException
261 * @deprecated Use {@link #getInstance(Configuration)}
262 */
263 @Deprecated
264 public static Job getInstance(Cluster ignored, Configuration conf)
265 throws IOException {
266 return getInstance(conf);
267 }
268
269 /**
270 * Creates a new {@link Job} with no particular {@link Cluster} and given
271 * {@link Configuration} and {@link JobStatus}.
272 * A Cluster will be created from the conf parameter only when it's needed.
273 *
274 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
275 * that any necessary internal modifications do not reflect on the incoming
276 * parameter.
277 *
278 * @param cluster cluster
279 * @param status job status
280 * @param conf job configuration
281 * @return the {@link Job} , with no connection to a cluster yet.
282 * @throws IOException
283 */
284 @Private
285 public static Job getInstance(Cluster cluster, JobStatus status,
286 Configuration conf) throws IOException {
287 Job job = getInstance(status, conf);
288 job.setCluster(cluster);
289 return job;
290 }
291
292 private void ensureState(JobState state) throws IllegalStateException {
293 if (state != this.state) {
294 throw new IllegalStateException("Job in state "+ this.state +
295 " instead of " + state);
296 }
297
298 if (state == JobState.RUNNING && cluster == null) {
299 throw new IllegalStateException
300 ("Job in state " + this.state
301 + ", but it isn't attached to any job tracker!");
302 }
303 }
304
305 /**
306 * Some methods rely on having a recent job status object. Refresh
307 * it, if necessary
308 */
309 synchronized void ensureFreshStatus()
310 throws IOException {
311 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
312 updateStatus();
313 }
314 }
315
316 /** Some methods need to update status immediately. So, refresh
317 * immediately
318 * @throws IOException
319 */
320 synchronized void updateStatus() throws IOException {
321 try {
322 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
323 @Override
324 public JobStatus run() throws IOException, InterruptedException {
325 return cluster.getClient().getJobStatus(status.getJobID());
326 }
327 });
328 }
329 catch (InterruptedException ie) {
330 throw new IOException(ie);
331 }
332 if (this.status == null) {
333 throw new IOException("Job status not available ");
334 }
335 this.statustime = System.currentTimeMillis();
336 }
337
338 public JobStatus getStatus() throws IOException, InterruptedException {
339 ensureState(JobState.RUNNING);
340 updateStatus();
341 return status;
342 }
343
344 private void setStatus(JobStatus status) {
345 this.status = status;
346 }
347
348 /**
349 * Returns the current state of the Job.
350 *
351 * @return JobStatus#State
352 * @throws IOException
353 * @throws InterruptedException
354 */
355 public JobStatus.State getJobState()
356 throws IOException, InterruptedException {
357 ensureState(JobState.RUNNING);
358 updateStatus();
359 return status.getState();
360 }
361
362 /**
363 * Get the URL where some job progress information will be displayed.
364 *
365 * @return the URL where some job progress information will be displayed.
366 */
367 public String getTrackingURL(){
368 ensureState(JobState.RUNNING);
369 return status.getTrackingUrl().toString();
370 }
371
372 /**
373 * Get the path of the submitted job configuration.
374 *
375 * @return the path of the submitted job configuration.
376 */
377 public String getJobFile() {
378 ensureState(JobState.RUNNING);
379 return status.getJobFile();
380 }
381
382 /**
383 * Get start time of the job.
384 *
385 * @return the start time of the job
386 */
387 public long getStartTime() {
388 ensureState(JobState.RUNNING);
389 return status.getStartTime();
390 }
391
392 /**
393 * Get finish time of the job.
394 *
395 * @return the finish time of the job
396 */
397 public long getFinishTime() throws IOException, InterruptedException {
398 ensureState(JobState.RUNNING);
399 updateStatus();
400 return status.getFinishTime();
401 }
402
403 /**
404 * Get scheduling info of the job.
405 *
406 * @return the scheduling info of the job
407 */
408 public String getSchedulingInfo() {
409 ensureState(JobState.RUNNING);
410 return status.getSchedulingInfo();
411 }
412
413 /**
414 * Get scheduling info of the job.
415 *
416 * @return the scheduling info of the job
417 */
418 public JobPriority getPriority() throws IOException, InterruptedException {
419 ensureState(JobState.RUNNING);
420 updateStatus();
421 return status.getPriority();
422 }
423
424 /**
425 * The user-specified job name.
426 */
427 public String getJobName() {
428 if (state == JobState.DEFINE) {
429 return super.getJobName();
430 }
431 ensureState(JobState.RUNNING);
432 return status.getJobName();
433 }
434
435 public String getHistoryUrl() throws IOException, InterruptedException {
436 ensureState(JobState.RUNNING);
437 updateStatus();
438 return status.getHistoryFile();
439 }
440
441 public boolean isRetired() throws IOException, InterruptedException {
442 ensureState(JobState.RUNNING);
443 updateStatus();
444 return status.isRetired();
445 }
446
447 @Private
448 public Cluster getCluster() {
449 return cluster;
450 }
451
452 /** Only for mocks in unit tests. */
453 @Private
454 private void setCluster(Cluster cluster) {
455 this.cluster = cluster;
456 }
457
458 /**
459 * Dump stats to screen.
460 */
461 @Override
462 public String toString() {
463 ensureState(JobState.RUNNING);
464 String reasonforFailure = " ";
465 int numMaps = 0;
466 int numReduces = 0;
467 try {
468 updateStatus();
469 if (status.getState().equals(JobStatus.State.FAILED))
470 reasonforFailure = getTaskFailureEventString();
471 numMaps = getTaskReports(TaskType.MAP).length;
472 numReduces = getTaskReports(TaskType.REDUCE).length;
473 } catch (IOException e) {
474 } catch (InterruptedException ie) {
475 }
476 StringBuffer sb = new StringBuffer();
477 sb.append("Job: ").append(status.getJobID()).append("\n");
478 sb.append("Job File: ").append(status.getJobFile()).append("\n");
479 sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
480 sb.append("\n");
481 sb.append("Uber job : ").append(status.isUber()).append("\n");
482 sb.append("Number of maps: ").append(numMaps).append("\n");
483 sb.append("Number of reduces: ").append(numReduces).append("\n");
484 sb.append("map() completion: ");
485 sb.append(status.getMapProgress()).append("\n");
486 sb.append("reduce() completion: ");
487 sb.append(status.getReduceProgress()).append("\n");
488 sb.append("Job state: ");
489 sb.append(status.getState()).append("\n");
490 sb.append("retired: ").append(status.isRetired()).append("\n");
491 sb.append("reason for failure: ").append(reasonforFailure);
492 return sb.toString();
493 }
494
495 /**
496 * @return taskid which caused job failure
497 * @throws IOException
498 * @throws InterruptedException
499 */
500 String getTaskFailureEventString() throws IOException,
501 InterruptedException {
502 int failCount = 1;
503 TaskCompletionEvent lastEvent = null;
504 TaskCompletionEvent[] events = ugi.doAs(new
505 PrivilegedExceptionAction<TaskCompletionEvent[]>() {
506 @Override
507 public TaskCompletionEvent[] run() throws IOException,
508 InterruptedException {
509 return cluster.getClient().getTaskCompletionEvents(
510 status.getJobID(), 0, 10);
511 }
512 });
513 for (TaskCompletionEvent event : events) {
514 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
515 failCount++;
516 lastEvent = event;
517 }
518 }
519 if (lastEvent == null) {
520 return "There are no failed tasks for the job. "
521 + "Job is failed due to some other reason and reason "
522 + "can be found in the logs.";
523 }
524 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
525 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
526 return (" task " + taskID + " failed " +
527 failCount + " times " + "For details check tasktracker at: " +
528 lastEvent.getTaskTrackerHttp());
529 }
530
531 /**
532 * Get the information of the current state of the tasks of a job.
533 *
534 * @param type Type of the task
535 * @return the list of all of the map tips.
536 * @throws IOException
537 */
538 public TaskReport[] getTaskReports(TaskType type)
539 throws IOException, InterruptedException {
540 ensureState(JobState.RUNNING);
541 final TaskType tmpType = type;
542 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
543 public TaskReport[] run() throws IOException, InterruptedException {
544 return cluster.getClient().getTaskReports(getJobID(), tmpType);
545 }
546 });
547 }
548
549 /**
550 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
551 * and 1.0. When all map tasks have completed, the function returns 1.0.
552 *
553 * @return the progress of the job's map-tasks.
554 * @throws IOException
555 */
556 public float mapProgress() throws IOException {
557 ensureState(JobState.RUNNING);
558 ensureFreshStatus();
559 return status.getMapProgress();
560 }
561
562 /**
563 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
564 * and 1.0. When all reduce tasks have completed, the function returns 1.0.
565 *
566 * @return the progress of the job's reduce-tasks.
567 * @throws IOException
568 */
569 public float reduceProgress() throws IOException {
570 ensureState(JobState.RUNNING);
571 ensureFreshStatus();
572 return status.getReduceProgress();
573 }
574
575 /**
576 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0
577 * and 1.0. When all cleanup tasks have completed, the function returns 1.0.
578 *
579 * @return the progress of the job's cleanup-tasks.
580 * @throws IOException
581 */
582 public float cleanupProgress() throws IOException, InterruptedException {
583 ensureState(JobState.RUNNING);
584 ensureFreshStatus();
585 return status.getCleanupProgress();
586 }
587
588 /**
589 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0
590 * and 1.0. When all setup tasks have completed, the function returns 1.0.
591 *
592 * @return the progress of the job's setup-tasks.
593 * @throws IOException
594 */
595 public float setupProgress() throws IOException {
596 ensureState(JobState.RUNNING);
597 ensureFreshStatus();
598 return status.getSetupProgress();
599 }
600
601 /**
602 * Check if the job is finished or not.
603 * This is a non-blocking call.
604 *
605 * @return <code>true</code> if the job is complete, else <code>false</code>.
606 * @throws IOException
607 */
608 public boolean isComplete() throws IOException {
609 ensureState(JobState.RUNNING);
610 updateStatus();
611 return status.isJobComplete();
612 }
613
614 /**
615 * Check if the job completed successfully.
616 *
617 * @return <code>true</code> if the job succeeded, else <code>false</code>.
618 * @throws IOException
619 */
620 public boolean isSuccessful() throws IOException {
621 ensureState(JobState.RUNNING);
622 updateStatus();
623 return status.getState() == JobStatus.State.SUCCEEDED;
624 }
625
626 /**
627 * Kill the running job. Blocks until all job tasks have been
628 * killed as well. If the job is no longer running, it simply returns.
629 *
630 * @throws IOException
631 */
632 public void killJob() throws IOException {
633 ensureState(JobState.RUNNING);
634 try {
635 cluster.getClient().killJob(getJobID());
636 }
637 catch (InterruptedException ie) {
638 throw new IOException(ie);
639 }
640 }
641
642 /**
643 * Set the priority of a running job.
644 * @param priority the new priority for the job.
645 * @throws IOException
646 */
647 public void setPriority(JobPriority priority)
648 throws IOException, InterruptedException {
649 if (state == JobState.DEFINE) {
650 conf.setJobPriority(
651 org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
652 } else {
653 ensureState(JobState.RUNNING);
654 final JobPriority tmpPriority = priority;
655 ugi.doAs(new PrivilegedExceptionAction<Object>() {
656 @Override
657 public Object run() throws IOException, InterruptedException {
658 cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
659 return null;
660 }
661 });
662 }
663 }
664
665 /**
666 * Get events indicating completion (success/failure) of component tasks.
667 *
668 * @param startFrom index to start fetching events from
669 * @param numEvents number of events to fetch
670 * @return an array of {@link TaskCompletionEvent}s
671 * @throws IOException
672 */
673 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
674 final int numEvents) throws IOException, InterruptedException {
675 ensureState(JobState.RUNNING);
676 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
677 @Override
678 public TaskCompletionEvent[] run() throws IOException, InterruptedException {
679 return cluster.getClient().getTaskCompletionEvents(getJobID(),
680 startFrom, numEvents);
681 }
682 });
683 }
684
685 /**
686 * Get events indicating completion (success/failure) of component tasks.
687 *
688 * @param startFrom index to start fetching events from
689 * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s
690 * @throws IOException
691 */
692 public org.apache.hadoop.mapred.TaskCompletionEvent[]
693 getTaskCompletionEvents(final int startFrom) throws IOException {
694 try {
695 TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10);
696 org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents =
697 new org.apache.hadoop.mapred.TaskCompletionEvent[events.length];
698 for (int i = 0; i < events.length; i++) {
699 retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade
700 (events[i]);
701 }
702 return retEvents;
703 } catch (InterruptedException ie) {
704 throw new IOException(ie);
705 }
706 }
707
708 /**
709 * Kill indicated task attempt.
710 * @param taskId the id of the task to kill.
711 * @param shouldFail if <code>true</code> the task is failed and added
712 * to failed tasks list, otherwise it is just killed,
713 * w/o affecting job failure status.
714 */
715 @Private
716 public boolean killTask(final TaskAttemptID taskId,
717 final boolean shouldFail) throws IOException {
718 ensureState(JobState.RUNNING);
719 try {
720 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
721 public Boolean run() throws IOException, InterruptedException {
722 return cluster.getClient().killTask(taskId, shouldFail);
723 }
724 });
725 }
726 catch (InterruptedException ie) {
727 throw new IOException(ie);
728 }
729 }
730
731 /**
732 * Kill indicated task attempt.
733 *
734 * @param taskId the id of the task to be terminated.
735 * @throws IOException
736 */
737 public void killTask(final TaskAttemptID taskId)
738 throws IOException {
739 killTask(taskId, false);
740 }
741
742 /**
743 * Fail indicated task attempt.
744 *
745 * @param taskId the id of the task to be terminated.
746 * @throws IOException
747 */
748 public void failTask(final TaskAttemptID taskId)
749 throws IOException {
750 killTask(taskId, true);
751 }
752
753 /**
754 * Gets the counters for this job. May return null if the job has been
755 * retired and the job is no longer in the completed job store.
756 *
757 * @return the counters for this job.
758 * @throws IOException
759 */
760 public Counters getCounters()
761 throws IOException {
762 ensureState(JobState.RUNNING);
763 try {
764 return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
765 @Override
766 public Counters run() throws IOException, InterruptedException {
767 return cluster.getClient().getJobCounters(getJobID());
768 }
769 });
770 }
771 catch (InterruptedException ie) {
772 throw new IOException(ie);
773 }
774 }
775
776 /**
777 * Gets the diagnostic messages for a given task attempt.
778 * @param taskid
779 * @return the list of diagnostic messages for the task
780 * @throws IOException
781 */
782 public String[] getTaskDiagnostics(final TaskAttemptID taskid)
783 throws IOException, InterruptedException {
784 ensureState(JobState.RUNNING);
785 return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
786 @Override
787 public String[] run() throws IOException, InterruptedException {
788 return cluster.getClient().getTaskDiagnostics(taskid);
789 }
790 });
791 }
792
793 /**
794 * Set the number of reduce tasks for the job.
795 * @param tasks the number of reduce tasks
796 * @throws IllegalStateException if the job is submitted
797 */
798 public void setNumReduceTasks(int tasks) throws IllegalStateException {
799 ensureState(JobState.DEFINE);
800 conf.setNumReduceTasks(tasks);
801 }
802
803 /**
804 * Set the current working directory for the default file system.
805 *
806 * @param dir the new current working directory.
807 * @throws IllegalStateException if the job is submitted
808 */
809 public void setWorkingDirectory(Path dir) throws IOException {
810 ensureState(JobState.DEFINE);
811 conf.setWorkingDirectory(dir);
812 }
813
814 /**
815 * Set the {@link InputFormat} for the job.
816 * @param cls the <code>InputFormat</code> to use
817 * @throws IllegalStateException if the job is submitted
818 */
819 public void setInputFormatClass(Class<? extends InputFormat> cls
820 ) throws IllegalStateException {
821 ensureState(JobState.DEFINE);
822 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls,
823 InputFormat.class);
824 }
825
826 /**
827 * Set the {@link OutputFormat} for the job.
828 * @param cls the <code>OutputFormat</code> to use
829 * @throws IllegalStateException if the job is submitted
830 */
831 public void setOutputFormatClass(Class<? extends OutputFormat> cls
832 ) throws IllegalStateException {
833 ensureState(JobState.DEFINE);
834 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls,
835 OutputFormat.class);
836 }
837
838 /**
839 * Set the {@link Mapper} for the job.
840 * @param cls the <code>Mapper</code> to use
841 * @throws IllegalStateException if the job is submitted
842 */
843 public void setMapperClass(Class<? extends Mapper> cls
844 ) throws IllegalStateException {
845 ensureState(JobState.DEFINE);
846 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
847 }
848
849 /**
850 * Set the Jar by finding where a given class came from.
851 * @param cls the example class
852 */
853 public void setJarByClass(Class<?> cls) {
854 ensureState(JobState.DEFINE);
855 conf.setJarByClass(cls);
856 }
857
858 /**
859 * Set the job jar
860 */
861 public void setJar(String jar) {
862 ensureState(JobState.DEFINE);
863 conf.setJar(jar);
864 }
865
866 /**
867 * Set the reported username for this job.
868 *
869 * @param user the username for this job.
870 */
871 public void setUser(String user) {
872 ensureState(JobState.DEFINE);
873 conf.setUser(user);
874 }
875
876 /**
877 * Set the combiner class for the job.
878 * @param cls the combiner to use
879 * @throws IllegalStateException if the job is submitted
880 */
881 public void setCombinerClass(Class<? extends Reducer> cls
882 ) throws IllegalStateException {
883 ensureState(JobState.DEFINE);
884 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
885 }
886
887 /**
888 * Set the {@link Reducer} for the job.
889 * @param cls the <code>Reducer</code> to use
890 * @throws IllegalStateException if the job is submitted
891 */
892 public void setReducerClass(Class<? extends Reducer> cls
893 ) throws IllegalStateException {
894 ensureState(JobState.DEFINE);
895 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
896 }
897
898 /**
899 * Set the {@link Partitioner} for the job.
900 * @param cls the <code>Partitioner</code> to use
901 * @throws IllegalStateException if the job is submitted
902 */
903 public void setPartitionerClass(Class<? extends Partitioner> cls
904 ) throws IllegalStateException {
905 ensureState(JobState.DEFINE);
906 conf.setClass(PARTITIONER_CLASS_ATTR, cls,
907 Partitioner.class);
908 }
909
910 /**
911 * Set the key class for the map output data. This allows the user to
912 * specify the map output key class to be different than the final output
913 * value class.
914 *
915 * @param theClass the map output key class.
916 * @throws IllegalStateException if the job is submitted
917 */
918 public void setMapOutputKeyClass(Class<?> theClass
919 ) throws IllegalStateException {
920 ensureState(JobState.DEFINE);
921 conf.setMapOutputKeyClass(theClass);
922 }
923
924 /**
925 * Set the value class for the map output data. This allows the user to
926 * specify the map output value class to be different than the final output
927 * value class.
928 *
929 * @param theClass the map output value class.
930 * @throws IllegalStateException if the job is submitted
931 */
932 public void setMapOutputValueClass(Class<?> theClass
933 ) throws IllegalStateException {
934 ensureState(JobState.DEFINE);
935 conf.setMapOutputValueClass(theClass);
936 }
937
938 /**
939 * Set the key class for the job output data.
940 *
941 * @param theClass the key class for the job output data.
942 * @throws IllegalStateException if the job is submitted
943 */
944 public void setOutputKeyClass(Class<?> theClass
945 ) throws IllegalStateException {
946 ensureState(JobState.DEFINE);
947 conf.setOutputKeyClass(theClass);
948 }
949
950 /**
951 * Set the value class for job outputs.
952 *
953 * @param theClass the value class for job outputs.
954 * @throws IllegalStateException if the job is submitted
955 */
956 public void setOutputValueClass(Class<?> theClass
957 ) throws IllegalStateException {
958 ensureState(JobState.DEFINE);
959 conf.setOutputValueClass(theClass);
960 }
961
962 /**
963 * Define the comparator that controls which keys are grouped together
964 * for a single call to combiner,
965 * {@link Reducer#reduce(Object, Iterable,
966 * org.apache.hadoop.mapreduce.Reducer.Context)}
967 *
968 * @param cls the raw comparator to use
969 * @throws IllegalStateException if the job is submitted
970 */
971 public void setCombinerKeyGroupingComparatorClass(
972 Class<? extends RawComparator> cls) throws IllegalStateException {
973 ensureState(JobState.DEFINE);
974 conf.setCombinerKeyGroupingComparator(cls);
975 }
976
977 /**
978 * Define the comparator that controls how the keys are sorted before they
979 * are passed to the {@link Reducer}.
980 * @param cls the raw comparator
981 * @throws IllegalStateException if the job is submitted
982 * @see #setCombinerKeyGroupingComparatorClass(Class)
983 */
984 public void setSortComparatorClass(Class<? extends RawComparator> cls
985 ) throws IllegalStateException {
986 ensureState(JobState.DEFINE);
987 conf.setOutputKeyComparatorClass(cls);
988 }
989
990 /**
991 * Define the comparator that controls which keys are grouped together
992 * for a single call to
993 * {@link Reducer#reduce(Object, Iterable,
994 * org.apache.hadoop.mapreduce.Reducer.Context)}
995 * @param cls the raw comparator to use
996 * @throws IllegalStateException if the job is submitted
997 * @see #setCombinerKeyGroupingComparatorClass(Class)
998 */
999 public void setGroupingComparatorClass(Class<? extends RawComparator> cls
1000 ) throws IllegalStateException {
1001 ensureState(JobState.DEFINE);
1002 conf.setOutputValueGroupingComparator(cls);
1003 }
1004
1005 /**
1006 * Set the user-specified job name.
1007 *
1008 * @param name the job's new name.
1009 * @throws IllegalStateException if the job is submitted
1010 */
1011 public void setJobName(String name) throws IllegalStateException {
1012 ensureState(JobState.DEFINE);
1013 conf.setJobName(name);
1014 }
1015
1016 /**
1017 * Turn speculative execution on or off for this job.
1018 *
1019 * @param speculativeExecution <code>true</code> if speculative execution
1020 * should be turned on, else <code>false</code>.
1021 */
1022 public void setSpeculativeExecution(boolean speculativeExecution) {
1023 ensureState(JobState.DEFINE);
1024 conf.setSpeculativeExecution(speculativeExecution);
1025 }
1026
1027 /**
1028 * Turn speculative execution on or off for this job for map tasks.
1029 *
1030 * @param speculativeExecution <code>true</code> if speculative execution
1031 * should be turned on for map tasks,
1032 * else <code>false</code>.
1033 */
1034 public void setMapSpeculativeExecution(boolean speculativeExecution) {
1035 ensureState(JobState.DEFINE);
1036 conf.setMapSpeculativeExecution(speculativeExecution);
1037 }
1038
1039 /**
1040 * Turn speculative execution on or off for this job for reduce tasks.
1041 *
1042 * @param speculativeExecution <code>true</code> if speculative execution
1043 * should be turned on for reduce tasks,
1044 * else <code>false</code>.
1045 */
1046 public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1047 ensureState(JobState.DEFINE);
1048 conf.setReduceSpeculativeExecution(speculativeExecution);
1049 }
1050
1051 /**
1052 * Specify whether job-setup and job-cleanup is needed for the job
1053 *
1054 * @param needed If <code>true</code>, job-setup and job-cleanup will be
1055 * considered from {@link OutputCommitter}
1056 * else ignored.
1057 */
1058 public void setJobSetupCleanupNeeded(boolean needed) {
1059 ensureState(JobState.DEFINE);
1060 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
1061 }
1062
1063 /**
1064 * Set the given set of archives
1065 * @param archives The list of archives that need to be localized
1066 */
1067 public void setCacheArchives(URI[] archives) {
1068 ensureState(JobState.DEFINE);
1069 DistributedCache.setCacheArchives(archives, conf);
1070 }
1071
1072 /**
1073 * Set the given set of files
1074 * @param files The list of files that need to be localized
1075 */
1076 public void setCacheFiles(URI[] files) {
1077 ensureState(JobState.DEFINE);
1078 DistributedCache.setCacheFiles(files, conf);
1079 }
1080
1081 /**
1082 * Add a archives to be localized
1083 * @param uri The uri of the cache to be localized
1084 */
1085 public void addCacheArchive(URI uri) {
1086 ensureState(JobState.DEFINE);
1087 DistributedCache.addCacheArchive(uri, conf);
1088 }
1089
1090 /**
1091 * Add a file to be localized
1092 * @param uri The uri of the cache to be localized
1093 */
1094 public void addCacheFile(URI uri) {
1095 ensureState(JobState.DEFINE);
1096 DistributedCache.addCacheFile(uri, conf);
1097 }
1098
1099 /**
1100 * Add an file path to the current set of classpath entries It adds the file
1101 * to cache as well.
1102 *
1103 * Files added with this method will not be unpacked while being added to the
1104 * classpath.
1105 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1106 * method instead.
1107 *
1108 * @param file Path of the file to be added
1109 */
1110 public void addFileToClassPath(Path file)
1111 throws IOException {
1112 ensureState(JobState.DEFINE);
1113 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1114 }
1115
1116 /**
1117 * Add an archive path to the current set of classpath entries. It adds the
1118 * archive to cache as well.
1119 *
1120 * Archive files will be unpacked and added to the classpath
1121 * when being distributed.
1122 *
1123 * @param archive Path of the archive to be added
1124 */
1125 public void addArchiveToClassPath(Path archive)
1126 throws IOException {
1127 ensureState(JobState.DEFINE);
1128 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1129 }
1130
1131 /**
1132 * Originally intended to enable symlinks, but currently symlinks cannot be
1133 * disabled.
1134 */
1135 @Deprecated
1136 public void createSymlink() {
1137 ensureState(JobState.DEFINE);
1138 DistributedCache.createSymlink(conf);
1139 }
1140
1141 /**
1142 * Expert: Set the number of maximum attempts that will be made to run a
1143 * map task.
1144 *
1145 * @param n the number of attempts per map task.
1146 */
1147 public void setMaxMapAttempts(int n) {
1148 ensureState(JobState.DEFINE);
1149 conf.setMaxMapAttempts(n);
1150 }
1151
1152 /**
1153 * Expert: Set the number of maximum attempts that will be made to run a
1154 * reduce task.
1155 *
1156 * @param n the number of attempts per reduce task.
1157 */
1158 public void setMaxReduceAttempts(int n) {
1159 ensureState(JobState.DEFINE);
1160 conf.setMaxReduceAttempts(n);
1161 }
1162
1163 /**
1164 * Set whether the system should collect profiler information for some of
1165 * the tasks in this job? The information is stored in the user log
1166 * directory.
1167 * @param newValue true means it should be gathered
1168 */
1169 public void setProfileEnabled(boolean newValue) {
1170 ensureState(JobState.DEFINE);
1171 conf.setProfileEnabled(newValue);
1172 }
1173
1174 /**
1175 * Set the profiler configuration arguments. If the string contains a '%s' it
1176 * will be replaced with the name of the profiling output file when the task
1177 * runs.
1178 *
1179 * This value is passed to the task child JVM on the command line.
1180 *
1181 * @param value the configuration string
1182 */
1183 public void setProfileParams(String value) {
1184 ensureState(JobState.DEFINE);
1185 conf.setProfileParams(value);
1186 }
1187
1188 /**
1189 * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
1190 * must also be called.
1191 * @param newValue a set of integer ranges of the map ids
1192 */
1193 public void setProfileTaskRange(boolean isMap, String newValue) {
1194 ensureState(JobState.DEFINE);
1195 conf.setProfileTaskRange(isMap, newValue);
1196 }
1197
1198 private void ensureNotSet(String attr, String msg) throws IOException {
1199 if (conf.get(attr) != null) {
1200 throw new IOException(attr + " is incompatible with " + msg + " mode.");
1201 }
1202 }
1203
1204 /**
1205 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1206 * tokens upon job completion. Defaults to true.
1207 */
1208 public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1209 ensureState(JobState.DEFINE);
1210 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1211 }
1212
1213 /**
1214 * Default to the new APIs unless they are explicitly set or the old mapper or
1215 * reduce attributes are used.
1216 * @throws IOException if the configuration is inconsistant
1217 */
1218 private void setUseNewAPI() throws IOException {
1219 int numReduces = conf.getNumReduceTasks();
1220 String oldMapperClass = "mapred.mapper.class";
1221 String oldReduceClass = "mapred.reducer.class";
1222 conf.setBooleanIfUnset("mapred.mapper.new-api",
1223 conf.get(oldMapperClass) == null);
1224 if (conf.getUseNewMapper()) {
1225 String mode = "new map API";
1226 ensureNotSet("mapred.input.format.class", mode);
1227 ensureNotSet(oldMapperClass, mode);
1228 if (numReduces != 0) {
1229 ensureNotSet("mapred.partitioner.class", mode);
1230 } else {
1231 ensureNotSet("mapred.output.format.class", mode);
1232 }
1233 } else {
1234 String mode = "map compatability";
1235 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1236 ensureNotSet(MAP_CLASS_ATTR, mode);
1237 if (numReduces != 0) {
1238 ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1239 } else {
1240 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1241 }
1242 }
1243 if (numReduces != 0) {
1244 conf.setBooleanIfUnset("mapred.reducer.new-api",
1245 conf.get(oldReduceClass) == null);
1246 if (conf.getUseNewReducer()) {
1247 String mode = "new reduce API";
1248 ensureNotSet("mapred.output.format.class", mode);
1249 ensureNotSet(oldReduceClass, mode);
1250 } else {
1251 String mode = "reduce compatability";
1252 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1253 ensureNotSet(REDUCE_CLASS_ATTR, mode);
1254 }
1255 }
1256 }
1257
1258 private synchronized void connect()
1259 throws IOException, InterruptedException, ClassNotFoundException {
1260 if (cluster == null) {
1261 cluster =
1262 ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1263 public Cluster run()
1264 throws IOException, InterruptedException,
1265 ClassNotFoundException {
1266 return new Cluster(getConfiguration());
1267 }
1268 });
1269 }
1270 }
1271
1272 boolean isConnected() {
1273 return cluster != null;
1274 }
1275
1276 /** Only for mocking via unit tests. */
1277 @Private
1278 public JobSubmitter getJobSubmitter(FileSystem fs,
1279 ClientProtocol submitClient) throws IOException {
1280 return new JobSubmitter(fs, submitClient);
1281 }
1282 /**
1283 * Submit the job to the cluster and return immediately.
1284 * @throws IOException
1285 */
1286 public void submit()
1287 throws IOException, InterruptedException, ClassNotFoundException {
1288 ensureState(JobState.DEFINE);
1289 setUseNewAPI();
1290 connect();
1291 final JobSubmitter submitter =
1292 getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1293 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1294 public JobStatus run() throws IOException, InterruptedException,
1295 ClassNotFoundException {
1296 return submitter.submitJobInternal(Job.this, cluster);
1297 }
1298 });
1299 state = JobState.RUNNING;
1300 LOG.info("The url to track the job: " + getTrackingURL());
1301 }
1302
1303 /**
1304 * Submit the job to the cluster and wait for it to finish.
1305 * @param verbose print the progress to the user
1306 * @return true if the job succeeded
1307 * @throws IOException thrown if the communication with the
1308 * <code>JobTracker</code> is lost
1309 */
1310 public boolean waitForCompletion(boolean verbose
1311 ) throws IOException, InterruptedException,
1312 ClassNotFoundException {
1313 if (state == JobState.DEFINE) {
1314 submit();
1315 }
1316 if (verbose) {
1317 monitorAndPrintJob();
1318 } else {
1319 // get the completion poll interval from the client.
1320 int completionPollIntervalMillis =
1321 Job.getCompletionPollInterval(cluster.getConf());
1322 while (!isComplete()) {
1323 try {
1324 Thread.sleep(completionPollIntervalMillis);
1325 } catch (InterruptedException ie) {
1326 }
1327 }
1328 }
1329 return isSuccessful();
1330 }
1331
1332 /**
1333 * Monitor a job and print status in real-time as progress is made and tasks
1334 * fail.
1335 * @return true if the job succeeded
1336 * @throws IOException if communication to the JobTracker fails
1337 */
1338 public boolean monitorAndPrintJob()
1339 throws IOException, InterruptedException {
1340 String lastReport = null;
1341 Job.TaskStatusFilter filter;
1342 Configuration clientConf = getConfiguration();
1343 filter = Job.getTaskOutputFilter(clientConf);
1344 JobID jobId = getJobID();
1345 LOG.info("Running job: " + jobId);
1346 int eventCounter = 0;
1347 boolean profiling = getProfileEnabled();
1348 IntegerRanges mapRanges = getProfileTaskRange(true);
1349 IntegerRanges reduceRanges = getProfileTaskRange(false);
1350 int progMonitorPollIntervalMillis =
1351 Job.getProgressPollInterval(clientConf);
1352 /* make sure to report full progress after the job is done */
1353 boolean reportedAfterCompletion = false;
1354 boolean reportedUberMode = false;
1355 while (!isComplete() || !reportedAfterCompletion) {
1356 if (isComplete()) {
1357 reportedAfterCompletion = true;
1358 } else {
1359 Thread.sleep(progMonitorPollIntervalMillis);
1360 }
1361 if (status.getState() == JobStatus.State.PREP) {
1362 continue;
1363 }
1364 if (!reportedUberMode) {
1365 reportedUberMode = true;
1366 LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1367 }
1368 String report =
1369 (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1370 " reduce " +
1371 StringUtils.formatPercent(reduceProgress(), 0));
1372 if (!report.equals(lastReport)) {
1373 LOG.info(report);
1374 lastReport = report;
1375 }
1376
1377 TaskCompletionEvent[] events =
1378 getTaskCompletionEvents(eventCounter, 10);
1379 eventCounter += events.length;
1380 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1381 }
1382 boolean success = isSuccessful();
1383 if (success) {
1384 LOG.info("Job " + jobId + " completed successfully");
1385 } else {
1386 LOG.info("Job " + jobId + " failed with state " + status.getState() +
1387 " due to: " + status.getFailureInfo());
1388 }
1389 Counters counters = getCounters();
1390 if (counters != null) {
1391 LOG.info(counters.toString());
1392 }
1393 return success;
1394 }
1395
1396 /**
1397 * @return true if the profile parameters indicate that this is using
1398 * hprof, which generates profile files in a particular location
1399 * that we can retrieve to the client.
1400 */
1401 private boolean shouldDownloadProfile() {
1402 // Check the argument string that was used to initialize profiling.
1403 // If this indicates hprof and file-based output, then we're ok to
1404 // download.
1405 String profileParams = getProfileParams();
1406
1407 if (null == profileParams) {
1408 return false;
1409 }
1410
1411 // Split this on whitespace.
1412 String [] parts = profileParams.split("[ \\t]+");
1413
1414 // If any of these indicate hprof, and the use of output files, return true.
1415 boolean hprofFound = false;
1416 boolean fileFound = false;
1417 for (String p : parts) {
1418 if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
1419 hprofFound = true;
1420
1421 // This contains a number of comma-delimited components, one of which
1422 // may specify the file to write to. Make sure this is present and
1423 // not empty.
1424 String [] subparts = p.split(",");
1425 for (String sub : subparts) {
1426 if (sub.startsWith("file=") && sub.length() != "file=".length()) {
1427 fileFound = true;
1428 }
1429 }
1430 }
1431 }
1432
1433 return hprofFound && fileFound;
1434 }
1435
1436 private void printTaskEvents(TaskCompletionEvent[] events,
1437 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1438 IntegerRanges reduceRanges) throws IOException, InterruptedException {
1439 for (TaskCompletionEvent event : events) {
1440 switch (filter) {
1441 case NONE:
1442 break;
1443 case SUCCEEDED:
1444 if (event.getStatus() ==
1445 TaskCompletionEvent.Status.SUCCEEDED) {
1446 LOG.info(event.toString());
1447 }
1448 break;
1449 case FAILED:
1450 if (event.getStatus() ==
1451 TaskCompletionEvent.Status.FAILED) {
1452 LOG.info(event.toString());
1453 // Displaying the task diagnostic information
1454 TaskAttemptID taskId = event.getTaskAttemptId();
1455 String[] taskDiagnostics = getTaskDiagnostics(taskId);
1456 if (taskDiagnostics != null) {
1457 for (String diagnostics : taskDiagnostics) {
1458 System.err.println(diagnostics);
1459 }
1460 }
1461 }
1462 break;
1463 case KILLED:
1464 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1465 LOG.info(event.toString());
1466 }
1467 break;
1468 case ALL:
1469 LOG.info(event.toString());
1470 break;
1471 }
1472 }
1473 }
1474
1475 /** The interval at which monitorAndPrintJob() prints status */
1476 public static int getProgressPollInterval(Configuration conf) {
1477 // Read progress monitor poll interval from config. Default is 1 second.
1478 int progMonitorPollIntervalMillis = conf.getInt(
1479 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1480 if (progMonitorPollIntervalMillis < 1) {
1481 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY +
1482 " has been set to an invalid value; "
1483 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1484 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1485 }
1486 return progMonitorPollIntervalMillis;
1487 }
1488
1489 /** The interval at which waitForCompletion() should check. */
1490 public static int getCompletionPollInterval(Configuration conf) {
1491 int completionPollIntervalMillis = conf.getInt(
1492 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1493 if (completionPollIntervalMillis < 1) {
1494 LOG.warn(COMPLETION_POLL_INTERVAL_KEY +
1495 " has been set to an invalid value; "
1496 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1497 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1498 }
1499 return completionPollIntervalMillis;
1500 }
1501
1502 /**
1503 * Get the task output filter.
1504 *
1505 * @param conf the configuration.
1506 * @return the filter level.
1507 */
1508 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1509 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1510 }
1511
1512 /**
1513 * Modify the Configuration to set the task output filter.
1514 *
1515 * @param conf the Configuration to modify.
1516 * @param newValue the value to set.
1517 */
1518 public static void setTaskOutputFilter(Configuration conf,
1519 TaskStatusFilter newValue) {
1520 conf.set(Job.OUTPUT_FILTER, newValue.toString());
1521 }
1522
1523 public boolean isUber() throws IOException, InterruptedException {
1524 ensureState(JobState.RUNNING);
1525 updateStatus();
1526 return status.isUber();
1527 }
1528
1529 /**
1530 * Get the reservation to which the job is submitted to, if any
1531 *
1532 * @return the reservationId the identifier of the job's reservation, null if
1533 * the job does not have any reservation associated with it
1534 */
1535 public ReservationId getReservationId() {
1536 return reservationId;
1537 }
1538
1539 /**
1540 * Set the reservation to which the job is submitted to
1541 *
1542 * @param reservationId the reservationId to set
1543 */
1544 public void setReservationId(ReservationId reservationId) {
1545 this.reservationId = reservationId;
1546 }
1547
1548 }