001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce;
020
021 import java.io.IOException;
022 import java.net.URI;
023 import java.security.PrivilegedExceptionAction;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029 import org.apache.hadoop.classification.InterfaceAudience.Private;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.conf.Configuration.IntegerRanges;
032 import org.apache.hadoop.fs.FileSystem;
033 import org.apache.hadoop.fs.Path;
034 import org.apache.hadoop.io.RawComparator;
035 import org.apache.hadoop.mapred.JobConf;
036 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038 import org.apache.hadoop.mapreduce.task.JobContextImpl;
039 import org.apache.hadoop.mapreduce.util.ConfigUtil;
040 import org.apache.hadoop.util.StringUtils;
041
042 /**
043 * The job submitter's view of the Job.
044 *
045 * <p>It allows the user to configure the
046 * job, submit it, control its execution, and query the state. The set methods
047 * only work until the job is submitted, afterwards they will throw an
048 * IllegalStateException. </p>
049 *
050 * <p>
051 * Normally the user creates the application, describes various facets of the
052 * job via {@link Job} and then submits the job and monitor its progress.</p>
053 *
054 * <p>Here is an example on how to submit a job:</p>
055 * <p><blockquote><pre>
056 * // Create a new Job
057 * Job job = new Job(new Configuration());
058 * job.setJarByClass(MyJob.class);
059 *
060 * // Specify various job-specific parameters
061 * job.setJobName("myjob");
062 *
063 * job.setInputPath(new Path("in"));
064 * job.setOutputPath(new Path("out"));
065 *
066 * job.setMapperClass(MyJob.MyMapper.class);
067 * job.setReducerClass(MyJob.MyReducer.class);
068 *
069 * // Submit the job, then poll for progress until the job is complete
070 * job.waitForCompletion(true);
071 * </pre></blockquote></p>
072 *
073 *
074 */
075 @InterfaceAudience.Public
076 @InterfaceStability.Evolving
077 public class Job extends JobContextImpl implements JobContext {
078 private static final Log LOG = LogFactory.getLog(Job.class);
079
080 @InterfaceStability.Evolving
081 public static enum JobState {DEFINE, RUNNING};
082 private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
083 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
084 /** Key in mapred-*.xml that sets completionPollInvervalMillis */
085 public static final String COMPLETION_POLL_INTERVAL_KEY =
086 "mapreduce.client.completion.pollinterval";
087
088 /** Default completionPollIntervalMillis is 5000 ms. */
089 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
090 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
091 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
092 "mapreduce.client.progressmonitor.pollinterval";
093 /** Default progMonitorPollIntervalMillis is 1000 ms. */
094 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
095
096 public static final String USED_GENERIC_PARSER =
097 "mapreduce.client.genericoptionsparser.used";
098 public static final String SUBMIT_REPLICATION =
099 "mapreduce.client.submit.file.replication";
100 private static final String TASKLOG_PULL_TIMEOUT_KEY =
101 "mapreduce.client.tasklog.timeout";
102 private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
103
104 @InterfaceStability.Evolving
105 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
106
107 static {
108 ConfigUtil.loadResources();
109 }
110
111 private JobState state = JobState.DEFINE;
112 private JobStatus status;
113 private long statustime;
114 private Cluster cluster;
115
116 @Deprecated
117 public Job() throws IOException {
118 this(new Configuration());
119 }
120
121 @Deprecated
122 public Job(Configuration conf) throws IOException {
123 this(new JobConf(conf));
124 }
125
126 @Deprecated
127 public Job(Configuration conf, String jobName) throws IOException {
128 this(conf);
129 setJobName(jobName);
130 }
131
132 Job(JobConf conf) throws IOException {
133 super(conf, null);
134 // propagate existing user credentials to job
135 this.credentials.mergeAll(this.ugi.getCredentials());
136 this.cluster = null;
137 }
138
139 Job(JobStatus status, JobConf conf) throws IOException {
140 this(conf);
141 setJobID(status.getJobID());
142 this.status = status;
143 state = JobState.RUNNING;
144 }
145
146
147 /**
148 * Creates a new {@link Job} with no particular {@link Cluster} .
149 * A Cluster will be created with a generic {@link Configuration}.
150 *
151 * @return the {@link Job} , with no connection to a cluster yet.
152 * @throws IOException
153 */
154 public static Job getInstance() throws IOException {
155 // create with a null Cluster
156 return getInstance(new Configuration());
157 }
158
159 /**
160 * Creates a new {@link Job} with no particular {@link Cluster} and a
161 * given {@link Configuration}.
162 *
163 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
164 * that any necessary internal modifications do not reflect on the incoming
165 * parameter.
166 *
167 * A Cluster will be created from the conf parameter only when it's needed.
168 *
169 * @param conf the configuration
170 * @return the {@link Job} , with no connection to a cluster yet.
171 * @throws IOException
172 */
173 public static Job getInstance(Configuration conf) throws IOException {
174 // create with a null Cluster
175 JobConf jobConf = new JobConf(conf);
176 return new Job(jobConf);
177 }
178
179
180 /**
181 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
182 * A Cluster will be created from the conf parameter only when it's needed.
183 *
184 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
185 * that any necessary internal modifications do not reflect on the incoming
186 * parameter.
187 *
188 * @param conf the configuration
189 * @return the {@link Job} , with no connection to a cluster yet.
190 * @throws IOException
191 */
192 public static Job getInstance(Configuration conf, String jobName)
193 throws IOException {
194 // create with a null Cluster
195 Job result = getInstance(conf);
196 result.setJobName(jobName);
197 return result;
198 }
199
200 /**
201 * Creates a new {@link Job} with no particular {@link Cluster} and given
202 * {@link Configuration} and {@link JobStatus}.
203 * A Cluster will be created from the conf parameter only when it's needed.
204 *
205 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
206 * that any necessary internal modifications do not reflect on the incoming
207 * parameter.
208 *
209 * @param status job status
210 * @param conf job configuration
211 * @return the {@link Job} , with no connection to a cluster yet.
212 * @throws IOException
213 */
214 public static Job getInstance(JobStatus status, Configuration conf)
215 throws IOException {
216 return new Job(status, new JobConf(conf));
217 }
218
219 /**
220 * Creates a new {@link Job} with no particular {@link Cluster}.
221 * A Cluster will be created from the conf parameter only when it's needed.
222 *
223 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
224 * that any necessary internal modifications do not reflect on the incoming
225 * parameter.
226 *
227 * @param ignored
228 * @return the {@link Job} , with no connection to a cluster yet.
229 * @throws IOException
230 * @deprecated Use {@link #getInstance()}
231 */
232 @Deprecated
233 public static Job getInstance(Cluster ignored) throws IOException {
234 return getInstance();
235 }
236
237 /**
238 * Creates a new {@link Job} with no particular {@link Cluster} and given
239 * {@link Configuration}.
240 * A Cluster will be created from the conf parameter only when it's needed.
241 *
242 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
243 * that any necessary internal modifications do not reflect on the incoming
244 * parameter.
245 *
246 * @param ignored
247 * @param conf job configuration
248 * @return the {@link Job} , with no connection to a cluster yet.
249 * @throws IOException
250 * @deprecated Use {@link #getInstance(Configuration)}
251 */
252 @Deprecated
253 public static Job getInstance(Cluster ignored, Configuration conf)
254 throws IOException {
255 return getInstance(conf);
256 }
257
258 /**
259 * Creates a new {@link Job} with no particular {@link Cluster} and given
260 * {@link Configuration} and {@link JobStatus}.
261 * A Cluster will be created from the conf parameter only when it's needed.
262 *
263 * The <code>Job</code> makes a copy of the <code>Configuration</code> so
264 * that any necessary internal modifications do not reflect on the incoming
265 * parameter.
266 *
267 * @param cluster cluster
268 * @param status job status
269 * @param conf job configuration
270 * @return the {@link Job} , with no connection to a cluster yet.
271 * @throws IOException
272 */
273 @Private
274 public static Job getInstance(Cluster cluster, JobStatus status,
275 Configuration conf) throws IOException {
276 Job job = getInstance(status, conf);
277 job.setCluster(cluster);
278 return job;
279 }
280
281 private void ensureState(JobState state) throws IllegalStateException {
282 if (state != this.state) {
283 throw new IllegalStateException("Job in state "+ this.state +
284 " instead of " + state);
285 }
286
287 if (state == JobState.RUNNING && cluster == null) {
288 throw new IllegalStateException
289 ("Job in state " + this.state
290 + ", but it isn't attached to any job tracker!");
291 }
292 }
293
294 /**
295 * Some methods rely on having a recent job status object. Refresh
296 * it, if necessary
297 */
298 synchronized void ensureFreshStatus()
299 throws IOException {
300 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
301 updateStatus();
302 }
303 }
304
305 /** Some methods need to update status immediately. So, refresh
306 * immediately
307 * @throws IOException
308 */
309 synchronized void updateStatus() throws IOException {
310 try {
311 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
312 @Override
313 public JobStatus run() throws IOException, InterruptedException {
314 return cluster.getClient().getJobStatus(status.getJobID());
315 }
316 });
317 }
318 catch (InterruptedException ie) {
319 throw new IOException(ie);
320 }
321 if (this.status == null) {
322 throw new IOException("Job status not available ");
323 }
324 this.statustime = System.currentTimeMillis();
325 }
326
327 public JobStatus getStatus() throws IOException, InterruptedException {
328 ensureState(JobState.RUNNING);
329 updateStatus();
330 return status;
331 }
332
333 private void setStatus(JobStatus status) {
334 this.status = status;
335 }
336
337 /**
338 * Returns the current state of the Job.
339 *
340 * @return JobStatus#State
341 * @throws IOException
342 * @throws InterruptedException
343 */
344 public JobStatus.State getJobState()
345 throws IOException, InterruptedException {
346 ensureState(JobState.RUNNING);
347 updateStatus();
348 return status.getState();
349 }
350
351 /**
352 * Get the URL where some job progress information will be displayed.
353 *
354 * @return the URL where some job progress information will be displayed.
355 */
356 public String getTrackingURL(){
357 ensureState(JobState.RUNNING);
358 return status.getTrackingUrl().toString();
359 }
360
361 /**
362 * Get the path of the submitted job configuration.
363 *
364 * @return the path of the submitted job configuration.
365 */
366 public String getJobFile() {
367 ensureState(JobState.RUNNING);
368 return status.getJobFile();
369 }
370
371 /**
372 * Get start time of the job.
373 *
374 * @return the start time of the job
375 */
376 public long getStartTime() {
377 ensureState(JobState.RUNNING);
378 return status.getStartTime();
379 }
380
381 /**
382 * Get finish time of the job.
383 *
384 * @return the finish time of the job
385 */
386 public long getFinishTime() throws IOException, InterruptedException {
387 ensureState(JobState.RUNNING);
388 updateStatus();
389 return status.getFinishTime();
390 }
391
392 /**
393 * Get scheduling info of the job.
394 *
395 * @return the scheduling info of the job
396 */
397 public String getSchedulingInfo() {
398 ensureState(JobState.RUNNING);
399 return status.getSchedulingInfo();
400 }
401
402 /**
403 * Get scheduling info of the job.
404 *
405 * @return the scheduling info of the job
406 */
407 public JobPriority getPriority() throws IOException, InterruptedException {
408 ensureState(JobState.RUNNING);
409 updateStatus();
410 return status.getPriority();
411 }
412
413 /**
414 * The user-specified job name.
415 */
416 public String getJobName() {
417 if (state == JobState.DEFINE) {
418 return super.getJobName();
419 }
420 ensureState(JobState.RUNNING);
421 return status.getJobName();
422 }
423
424 public String getHistoryUrl() throws IOException, InterruptedException {
425 ensureState(JobState.RUNNING);
426 updateStatus();
427 return status.getHistoryFile();
428 }
429
430 public boolean isRetired() throws IOException, InterruptedException {
431 ensureState(JobState.RUNNING);
432 updateStatus();
433 return status.isRetired();
434 }
435
436 @Private
437 public Cluster getCluster() {
438 return cluster;
439 }
440
441 /** Only for mocks in unit tests. */
442 @Private
443 private void setCluster(Cluster cluster) {
444 this.cluster = cluster;
445 }
446
447 /**
448 * Dump stats to screen.
449 */
450 @Override
451 public String toString() {
452 ensureState(JobState.RUNNING);
453 String reasonforFailure = " ";
454 int numMaps = 0;
455 int numReduces = 0;
456 try {
457 updateStatus();
458 if (status.getState().equals(JobStatus.State.FAILED))
459 reasonforFailure = getTaskFailureEventString();
460 numMaps = getTaskReports(TaskType.MAP).length;
461 numReduces = getTaskReports(TaskType.REDUCE).length;
462 } catch (IOException e) {
463 } catch (InterruptedException ie) {
464 }
465 StringBuffer sb = new StringBuffer();
466 sb.append("Job: ").append(status.getJobID()).append("\n");
467 sb.append("Job File: ").append(status.getJobFile()).append("\n");
468 sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
469 sb.append("\n");
470 sb.append("Uber job : ").append(status.isUber()).append("\n");
471 sb.append("Number of maps: ").append(numMaps).append("\n");
472 sb.append("Number of reduces: ").append(numReduces).append("\n");
473 sb.append("map() completion: ");
474 sb.append(status.getMapProgress()).append("\n");
475 sb.append("reduce() completion: ");
476 sb.append(status.getReduceProgress()).append("\n");
477 sb.append("Job state: ");
478 sb.append(status.getState()).append("\n");
479 sb.append("retired: ").append(status.isRetired()).append("\n");
480 sb.append("reason for failure: ").append(reasonforFailure);
481 return sb.toString();
482 }
483
484 /**
485 * @return taskid which caused job failure
486 * @throws IOException
487 * @throws InterruptedException
488 */
489 String getTaskFailureEventString() throws IOException,
490 InterruptedException {
491 int failCount = 1;
492 TaskCompletionEvent lastEvent = null;
493 TaskCompletionEvent[] events = ugi.doAs(new
494 PrivilegedExceptionAction<TaskCompletionEvent[]>() {
495 @Override
496 public TaskCompletionEvent[] run() throws IOException,
497 InterruptedException {
498 return cluster.getClient().getTaskCompletionEvents(
499 status.getJobID(), 0, 10);
500 }
501 });
502 for (TaskCompletionEvent event : events) {
503 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
504 failCount++;
505 lastEvent = event;
506 }
507 }
508 if (lastEvent == null) {
509 return "There are no failed tasks for the job. "
510 + "Job is failed due to some other reason and reason "
511 + "can be found in the logs.";
512 }
513 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
514 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
515 return (" task " + taskID + " failed " +
516 failCount + " times " + "For details check tasktracker at: " +
517 lastEvent.getTaskTrackerHttp());
518 }
519
520 /**
521 * Get the information of the current state of the tasks of a job.
522 *
523 * @param type Type of the task
524 * @return the list of all of the map tips.
525 * @throws IOException
526 */
527 public TaskReport[] getTaskReports(TaskType type)
528 throws IOException, InterruptedException {
529 ensureState(JobState.RUNNING);
530 final TaskType tmpType = type;
531 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
532 public TaskReport[] run() throws IOException, InterruptedException {
533 return cluster.getClient().getTaskReports(getJobID(), tmpType);
534 }
535 });
536 }
537
538 /**
539 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
540 * and 1.0. When all map tasks have completed, the function returns 1.0.
541 *
542 * @return the progress of the job's map-tasks.
543 * @throws IOException
544 */
545 public float mapProgress() throws IOException {
546 ensureState(JobState.RUNNING);
547 ensureFreshStatus();
548 return status.getMapProgress();
549 }
550
551 /**
552 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
553 * and 1.0. When all reduce tasks have completed, the function returns 1.0.
554 *
555 * @return the progress of the job's reduce-tasks.
556 * @throws IOException
557 */
558 public float reduceProgress() throws IOException {
559 ensureState(JobState.RUNNING);
560 ensureFreshStatus();
561 return status.getReduceProgress();
562 }
563
564 /**
565 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0
566 * and 1.0. When all cleanup tasks have completed, the function returns 1.0.
567 *
568 * @return the progress of the job's cleanup-tasks.
569 * @throws IOException
570 */
571 public float cleanupProgress() throws IOException, InterruptedException {
572 ensureState(JobState.RUNNING);
573 ensureFreshStatus();
574 return status.getCleanupProgress();
575 }
576
577 /**
578 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0
579 * and 1.0. When all setup tasks have completed, the function returns 1.0.
580 *
581 * @return the progress of the job's setup-tasks.
582 * @throws IOException
583 */
584 public float setupProgress() throws IOException {
585 ensureState(JobState.RUNNING);
586 ensureFreshStatus();
587 return status.getSetupProgress();
588 }
589
590 /**
591 * Check if the job is finished or not.
592 * This is a non-blocking call.
593 *
594 * @return <code>true</code> if the job is complete, else <code>false</code>.
595 * @throws IOException
596 */
597 public boolean isComplete() throws IOException {
598 ensureState(JobState.RUNNING);
599 updateStatus();
600 return status.isJobComplete();
601 }
602
603 /**
604 * Check if the job completed successfully.
605 *
606 * @return <code>true</code> if the job succeeded, else <code>false</code>.
607 * @throws IOException
608 */
609 public boolean isSuccessful() throws IOException {
610 ensureState(JobState.RUNNING);
611 updateStatus();
612 return status.getState() == JobStatus.State.SUCCEEDED;
613 }
614
615 /**
616 * Kill the running job. Blocks until all job tasks have been
617 * killed as well. If the job is no longer running, it simply returns.
618 *
619 * @throws IOException
620 */
621 public void killJob() throws IOException {
622 ensureState(JobState.RUNNING);
623 try {
624 cluster.getClient().killJob(getJobID());
625 }
626 catch (InterruptedException ie) {
627 throw new IOException(ie);
628 }
629 }
630
631 /**
632 * Set the priority of a running job.
633 * @param priority the new priority for the job.
634 * @throws IOException
635 */
636 public void setPriority(JobPriority priority)
637 throws IOException, InterruptedException {
638 if (state == JobState.DEFINE) {
639 conf.setJobPriority(
640 org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
641 } else {
642 ensureState(JobState.RUNNING);
643 final JobPriority tmpPriority = priority;
644 ugi.doAs(new PrivilegedExceptionAction<Object>() {
645 @Override
646 public Object run() throws IOException, InterruptedException {
647 cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
648 return null;
649 }
650 });
651 }
652 }
653
654 /**
655 * Get events indicating completion (success/failure) of component tasks.
656 *
657 * @param startFrom index to start fetching events from
658 * @param numEvents number of events to fetch
659 * @return an array of {@link TaskCompletionEvent}s
660 * @throws IOException
661 */
662 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
663 final int numEvents) throws IOException, InterruptedException {
664 ensureState(JobState.RUNNING);
665 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
666 @Override
667 public TaskCompletionEvent[] run() throws IOException, InterruptedException {
668 return cluster.getClient().getTaskCompletionEvents(getJobID(),
669 startFrom, numEvents);
670 }
671 });
672 }
673
674 /**
675 * Get events indicating completion (success/failure) of component tasks.
676 *
677 * @param startFrom index to start fetching events from
678 * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s
679 * @throws IOException
680 */
681 public org.apache.hadoop.mapred.TaskCompletionEvent[]
682 getTaskCompletionEvents(final int startFrom) throws IOException {
683 try {
684 TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10);
685 org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents =
686 new org.apache.hadoop.mapred.TaskCompletionEvent[events.length];
687 for (int i = 0; i < events.length; i++) {
688 retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade
689 (events[i]);
690 }
691 return retEvents;
692 } catch (InterruptedException ie) {
693 throw new IOException(ie);
694 }
695 }
696
697 /**
698 * Kill indicated task attempt.
699 * @param taskId the id of the task to kill.
700 * @param shouldFail if <code>true</code> the task is failed and added
701 * to failed tasks list, otherwise it is just killed,
702 * w/o affecting job failure status.
703 */
704 @Private
705 public boolean killTask(final TaskAttemptID taskId,
706 final boolean shouldFail) throws IOException {
707 ensureState(JobState.RUNNING);
708 try {
709 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
710 public Boolean run() throws IOException, InterruptedException {
711 return cluster.getClient().killTask(taskId, shouldFail);
712 }
713 });
714 }
715 catch (InterruptedException ie) {
716 throw new IOException(ie);
717 }
718 }
719
720 /**
721 * Kill indicated task attempt.
722 *
723 * @param taskId the id of the task to be terminated.
724 * @throws IOException
725 */
726 public void killTask(final TaskAttemptID taskId)
727 throws IOException {
728 killTask(taskId, false);
729 }
730
731 /**
732 * Fail indicated task attempt.
733 *
734 * @param taskId the id of the task to be terminated.
735 * @throws IOException
736 */
737 public void failTask(final TaskAttemptID taskId)
738 throws IOException {
739 killTask(taskId, true);
740 }
741
742 /**
743 * Gets the counters for this job. May return null if the job has been
744 * retired and the job is no longer in the completed job store.
745 *
746 * @return the counters for this job.
747 * @throws IOException
748 */
749 public Counters getCounters()
750 throws IOException {
751 ensureState(JobState.RUNNING);
752 try {
753 return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
754 @Override
755 public Counters run() throws IOException, InterruptedException {
756 return cluster.getClient().getJobCounters(getJobID());
757 }
758 });
759 }
760 catch (InterruptedException ie) {
761 throw new IOException(ie);
762 }
763 }
764
765 /**
766 * Gets the diagnostic messages for a given task attempt.
767 * @param taskid
768 * @return the list of diagnostic messages for the task
769 * @throws IOException
770 */
771 public String[] getTaskDiagnostics(final TaskAttemptID taskid)
772 throws IOException, InterruptedException {
773 ensureState(JobState.RUNNING);
774 return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
775 @Override
776 public String[] run() throws IOException, InterruptedException {
777 return cluster.getClient().getTaskDiagnostics(taskid);
778 }
779 });
780 }
781
782 /**
783 * Set the number of reduce tasks for the job.
784 * @param tasks the number of reduce tasks
785 * @throws IllegalStateException if the job is submitted
786 */
787 public void setNumReduceTasks(int tasks) throws IllegalStateException {
788 ensureState(JobState.DEFINE);
789 conf.setNumReduceTasks(tasks);
790 }
791
792 /**
793 * Set the current working directory for the default file system.
794 *
795 * @param dir the new current working directory.
796 * @throws IllegalStateException if the job is submitted
797 */
798 public void setWorkingDirectory(Path dir) throws IOException {
799 ensureState(JobState.DEFINE);
800 conf.setWorkingDirectory(dir);
801 }
802
803 /**
804 * Set the {@link InputFormat} for the job.
805 * @param cls the <code>InputFormat</code> to use
806 * @throws IllegalStateException if the job is submitted
807 */
808 public void setInputFormatClass(Class<? extends InputFormat> cls
809 ) throws IllegalStateException {
810 ensureState(JobState.DEFINE);
811 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls,
812 InputFormat.class);
813 }
814
815 /**
816 * Set the {@link OutputFormat} for the job.
817 * @param cls the <code>OutputFormat</code> to use
818 * @throws IllegalStateException if the job is submitted
819 */
820 public void setOutputFormatClass(Class<? extends OutputFormat> cls
821 ) throws IllegalStateException {
822 ensureState(JobState.DEFINE);
823 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls,
824 OutputFormat.class);
825 }
826
827 /**
828 * Set the {@link Mapper} for the job.
829 * @param cls the <code>Mapper</code> to use
830 * @throws IllegalStateException if the job is submitted
831 */
832 public void setMapperClass(Class<? extends Mapper> cls
833 ) throws IllegalStateException {
834 ensureState(JobState.DEFINE);
835 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
836 }
837
838 /**
839 * Set the Jar by finding where a given class came from.
840 * @param cls the example class
841 */
842 public void setJarByClass(Class<?> cls) {
843 ensureState(JobState.DEFINE);
844 conf.setJarByClass(cls);
845 }
846
847 /**
848 * Set the job jar
849 */
850 public void setJar(String jar) {
851 ensureState(JobState.DEFINE);
852 conf.setJar(jar);
853 }
854
855 /**
856 * Set the reported username for this job.
857 *
858 * @param user the username for this job.
859 */
860 public void setUser(String user) {
861 ensureState(JobState.DEFINE);
862 conf.setUser(user);
863 }
864
865 /**
866 * Set the combiner class for the job.
867 * @param cls the combiner to use
868 * @throws IllegalStateException if the job is submitted
869 */
870 public void setCombinerClass(Class<? extends Reducer> cls
871 ) throws IllegalStateException {
872 ensureState(JobState.DEFINE);
873 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
874 }
875
876 /**
877 * Set the {@link Reducer} for the job.
878 * @param cls the <code>Reducer</code> to use
879 * @throws IllegalStateException if the job is submitted
880 */
881 public void setReducerClass(Class<? extends Reducer> cls
882 ) throws IllegalStateException {
883 ensureState(JobState.DEFINE);
884 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
885 }
886
887 /**
888 * Set the {@link Partitioner} for the job.
889 * @param cls the <code>Partitioner</code> to use
890 * @throws IllegalStateException if the job is submitted
891 */
892 public void setPartitionerClass(Class<? extends Partitioner> cls
893 ) throws IllegalStateException {
894 ensureState(JobState.DEFINE);
895 conf.setClass(PARTITIONER_CLASS_ATTR, cls,
896 Partitioner.class);
897 }
898
899 /**
900 * Set the key class for the map output data. This allows the user to
901 * specify the map output key class to be different than the final output
902 * value class.
903 *
904 * @param theClass the map output key class.
905 * @throws IllegalStateException if the job is submitted
906 */
907 public void setMapOutputKeyClass(Class<?> theClass
908 ) throws IllegalStateException {
909 ensureState(JobState.DEFINE);
910 conf.setMapOutputKeyClass(theClass);
911 }
912
913 /**
914 * Set the value class for the map output data. This allows the user to
915 * specify the map output value class to be different than the final output
916 * value class.
917 *
918 * @param theClass the map output value class.
919 * @throws IllegalStateException if the job is submitted
920 */
921 public void setMapOutputValueClass(Class<?> theClass
922 ) throws IllegalStateException {
923 ensureState(JobState.DEFINE);
924 conf.setMapOutputValueClass(theClass);
925 }
926
927 /**
928 * Set the key class for the job output data.
929 *
930 * @param theClass the key class for the job output data.
931 * @throws IllegalStateException if the job is submitted
932 */
933 public void setOutputKeyClass(Class<?> theClass
934 ) throws IllegalStateException {
935 ensureState(JobState.DEFINE);
936 conf.setOutputKeyClass(theClass);
937 }
938
939 /**
940 * Set the value class for job outputs.
941 *
942 * @param theClass the value class for job outputs.
943 * @throws IllegalStateException if the job is submitted
944 */
945 public void setOutputValueClass(Class<?> theClass
946 ) throws IllegalStateException {
947 ensureState(JobState.DEFINE);
948 conf.setOutputValueClass(theClass);
949 }
950
951 /**
952 * Define the comparator that controls which keys are grouped together
953 * for a single call to combiner,
954 * {@link Reducer#reduce(Object, Iterable,
955 * org.apache.hadoop.mapreduce.Reducer.Context)}
956 *
957 * @param cls the raw comparator to use
958 * @throws IllegalStateException if the job is submitted
959 */
960 public void setCombinerKeyGroupingComparatorClass(
961 Class<? extends RawComparator> cls) throws IllegalStateException {
962 ensureState(JobState.DEFINE);
963 conf.setCombinerKeyGroupingComparator(cls);
964 }
965
966 /**
967 * Define the comparator that controls how the keys are sorted before they
968 * are passed to the {@link Reducer}.
969 * @param cls the raw comparator
970 * @throws IllegalStateException if the job is submitted
971 * @see #setCombinerKeyGroupingComparatorClass(Class)
972 */
973 public void setSortComparatorClass(Class<? extends RawComparator> cls
974 ) throws IllegalStateException {
975 ensureState(JobState.DEFINE);
976 conf.setOutputKeyComparatorClass(cls);
977 }
978
979 /**
980 * Define the comparator that controls which keys are grouped together
981 * for a single call to
982 * {@link Reducer#reduce(Object, Iterable,
983 * org.apache.hadoop.mapreduce.Reducer.Context)}
984 * @param cls the raw comparator to use
985 * @throws IllegalStateException if the job is submitted
986 * @see #setCombinerKeyGroupingComparatorClass(Class)
987 */
988 public void setGroupingComparatorClass(Class<? extends RawComparator> cls
989 ) throws IllegalStateException {
990 ensureState(JobState.DEFINE);
991 conf.setOutputValueGroupingComparator(cls);
992 }
993
994 /**
995 * Set the user-specified job name.
996 *
997 * @param name the job's new name.
998 * @throws IllegalStateException if the job is submitted
999 */
1000 public void setJobName(String name) throws IllegalStateException {
1001 ensureState(JobState.DEFINE);
1002 conf.setJobName(name);
1003 }
1004
1005 /**
1006 * Turn speculative execution on or off for this job.
1007 *
1008 * @param speculativeExecution <code>true</code> if speculative execution
1009 * should be turned on, else <code>false</code>.
1010 */
1011 public void setSpeculativeExecution(boolean speculativeExecution) {
1012 ensureState(JobState.DEFINE);
1013 conf.setSpeculativeExecution(speculativeExecution);
1014 }
1015
1016 /**
1017 * Turn speculative execution on or off for this job for map tasks.
1018 *
1019 * @param speculativeExecution <code>true</code> if speculative execution
1020 * should be turned on for map tasks,
1021 * else <code>false</code>.
1022 */
1023 public void setMapSpeculativeExecution(boolean speculativeExecution) {
1024 ensureState(JobState.DEFINE);
1025 conf.setMapSpeculativeExecution(speculativeExecution);
1026 }
1027
1028 /**
1029 * Turn speculative execution on or off for this job for reduce tasks.
1030 *
1031 * @param speculativeExecution <code>true</code> if speculative execution
1032 * should be turned on for reduce tasks,
1033 * else <code>false</code>.
1034 */
1035 public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1036 ensureState(JobState.DEFINE);
1037 conf.setReduceSpeculativeExecution(speculativeExecution);
1038 }
1039
1040 /**
1041 * Specify whether job-setup and job-cleanup is needed for the job
1042 *
1043 * @param needed If <code>true</code>, job-setup and job-cleanup will be
1044 * considered from {@link OutputCommitter}
1045 * else ignored.
1046 */
1047 public void setJobSetupCleanupNeeded(boolean needed) {
1048 ensureState(JobState.DEFINE);
1049 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
1050 }
1051
1052 /**
1053 * Set the given set of archives
1054 * @param archives The list of archives that need to be localized
1055 */
1056 public void setCacheArchives(URI[] archives) {
1057 ensureState(JobState.DEFINE);
1058 DistributedCache.setCacheArchives(archives, conf);
1059 }
1060
1061 /**
1062 * Set the given set of files
1063 * @param files The list of files that need to be localized
1064 */
1065 public void setCacheFiles(URI[] files) {
1066 ensureState(JobState.DEFINE);
1067 DistributedCache.setCacheFiles(files, conf);
1068 }
1069
1070 /**
1071 * Add a archives to be localized
1072 * @param uri The uri of the cache to be localized
1073 */
1074 public void addCacheArchive(URI uri) {
1075 ensureState(JobState.DEFINE);
1076 DistributedCache.addCacheArchive(uri, conf);
1077 }
1078
1079 /**
1080 * Add a file to be localized
1081 * @param uri The uri of the cache to be localized
1082 */
1083 public void addCacheFile(URI uri) {
1084 ensureState(JobState.DEFINE);
1085 DistributedCache.addCacheFile(uri, conf);
1086 }
1087
1088 /**
1089 * Add an file path to the current set of classpath entries It adds the file
1090 * to cache as well.
1091 *
1092 * Files added with this method will not be unpacked while being added to the
1093 * classpath.
1094 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1095 * method instead.
1096 *
1097 * @param file Path of the file to be added
1098 */
1099 public void addFileToClassPath(Path file)
1100 throws IOException {
1101 ensureState(JobState.DEFINE);
1102 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1103 }
1104
1105 /**
1106 * Add an archive path to the current set of classpath entries. It adds the
1107 * archive to cache as well.
1108 *
1109 * Archive files will be unpacked and added to the classpath
1110 * when being distributed.
1111 *
1112 * @param archive Path of the archive to be added
1113 */
1114 public void addArchiveToClassPath(Path archive)
1115 throws IOException {
1116 ensureState(JobState.DEFINE);
1117 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1118 }
1119
1120 /**
1121 * Originally intended to enable symlinks, but currently symlinks cannot be
1122 * disabled.
1123 */
1124 @Deprecated
1125 public void createSymlink() {
1126 ensureState(JobState.DEFINE);
1127 DistributedCache.createSymlink(conf);
1128 }
1129
1130 /**
1131 * Expert: Set the number of maximum attempts that will be made to run a
1132 * map task.
1133 *
1134 * @param n the number of attempts per map task.
1135 */
1136 public void setMaxMapAttempts(int n) {
1137 ensureState(JobState.DEFINE);
1138 conf.setMaxMapAttempts(n);
1139 }
1140
1141 /**
1142 * Expert: Set the number of maximum attempts that will be made to run a
1143 * reduce task.
1144 *
1145 * @param n the number of attempts per reduce task.
1146 */
1147 public void setMaxReduceAttempts(int n) {
1148 ensureState(JobState.DEFINE);
1149 conf.setMaxReduceAttempts(n);
1150 }
1151
1152 /**
1153 * Set whether the system should collect profiler information for some of
1154 * the tasks in this job? The information is stored in the user log
1155 * directory.
1156 * @param newValue true means it should be gathered
1157 */
1158 public void setProfileEnabled(boolean newValue) {
1159 ensureState(JobState.DEFINE);
1160 conf.setProfileEnabled(newValue);
1161 }
1162
1163 /**
1164 * Set the profiler configuration arguments. If the string contains a '%s' it
1165 * will be replaced with the name of the profiling output file when the task
1166 * runs.
1167 *
1168 * This value is passed to the task child JVM on the command line.
1169 *
1170 * @param value the configuration string
1171 */
1172 public void setProfileParams(String value) {
1173 ensureState(JobState.DEFINE);
1174 conf.setProfileParams(value);
1175 }
1176
1177 /**
1178 * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
1179 * must also be called.
1180 * @param newValue a set of integer ranges of the map ids
1181 */
1182 public void setProfileTaskRange(boolean isMap, String newValue) {
1183 ensureState(JobState.DEFINE);
1184 conf.setProfileTaskRange(isMap, newValue);
1185 }
1186
1187 private void ensureNotSet(String attr, String msg) throws IOException {
1188 if (conf.get(attr) != null) {
1189 throw new IOException(attr + " is incompatible with " + msg + " mode.");
1190 }
1191 }
1192
1193 /**
1194 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1195 * tokens upon job completion. Defaults to true.
1196 */
1197 public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1198 ensureState(JobState.DEFINE);
1199 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1200 }
1201
1202 /**
1203 * Default to the new APIs unless they are explicitly set or the old mapper or
1204 * reduce attributes are used.
1205 * @throws IOException if the configuration is inconsistant
1206 */
1207 private void setUseNewAPI() throws IOException {
1208 int numReduces = conf.getNumReduceTasks();
1209 String oldMapperClass = "mapred.mapper.class";
1210 String oldReduceClass = "mapred.reducer.class";
1211 conf.setBooleanIfUnset("mapred.mapper.new-api",
1212 conf.get(oldMapperClass) == null);
1213 if (conf.getUseNewMapper()) {
1214 String mode = "new map API";
1215 ensureNotSet("mapred.input.format.class", mode);
1216 ensureNotSet(oldMapperClass, mode);
1217 if (numReduces != 0) {
1218 ensureNotSet("mapred.partitioner.class", mode);
1219 } else {
1220 ensureNotSet("mapred.output.format.class", mode);
1221 }
1222 } else {
1223 String mode = "map compatability";
1224 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1225 ensureNotSet(MAP_CLASS_ATTR, mode);
1226 if (numReduces != 0) {
1227 ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1228 } else {
1229 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1230 }
1231 }
1232 if (numReduces != 0) {
1233 conf.setBooleanIfUnset("mapred.reducer.new-api",
1234 conf.get(oldReduceClass) == null);
1235 if (conf.getUseNewReducer()) {
1236 String mode = "new reduce API";
1237 ensureNotSet("mapred.output.format.class", mode);
1238 ensureNotSet(oldReduceClass, mode);
1239 } else {
1240 String mode = "reduce compatability";
1241 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1242 ensureNotSet(REDUCE_CLASS_ATTR, mode);
1243 }
1244 }
1245 }
1246
1247 private synchronized void connect()
1248 throws IOException, InterruptedException, ClassNotFoundException {
1249 if (cluster == null) {
1250 cluster =
1251 ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1252 public Cluster run()
1253 throws IOException, InterruptedException,
1254 ClassNotFoundException {
1255 return new Cluster(getConfiguration());
1256 }
1257 });
1258 }
1259 }
1260
1261 boolean isConnected() {
1262 return cluster != null;
1263 }
1264
1265 /** Only for mocking via unit tests. */
1266 @Private
1267 public JobSubmitter getJobSubmitter(FileSystem fs,
1268 ClientProtocol submitClient) throws IOException {
1269 return new JobSubmitter(fs, submitClient);
1270 }
1271 /**
1272 * Submit the job to the cluster and return immediately.
1273 * @throws IOException
1274 */
1275 public void submit()
1276 throws IOException, InterruptedException, ClassNotFoundException {
1277 ensureState(JobState.DEFINE);
1278 setUseNewAPI();
1279 connect();
1280 final JobSubmitter submitter =
1281 getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1282 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1283 public JobStatus run() throws IOException, InterruptedException,
1284 ClassNotFoundException {
1285 return submitter.submitJobInternal(Job.this, cluster);
1286 }
1287 });
1288 state = JobState.RUNNING;
1289 LOG.info("The url to track the job: " + getTrackingURL());
1290 }
1291
1292 /**
1293 * Submit the job to the cluster and wait for it to finish.
1294 * @param verbose print the progress to the user
1295 * @return true if the job succeeded
1296 * @throws IOException thrown if the communication with the
1297 * <code>JobTracker</code> is lost
1298 */
1299 public boolean waitForCompletion(boolean verbose
1300 ) throws IOException, InterruptedException,
1301 ClassNotFoundException {
1302 if (state == JobState.DEFINE) {
1303 submit();
1304 }
1305 if (verbose) {
1306 monitorAndPrintJob();
1307 } else {
1308 // get the completion poll interval from the client.
1309 int completionPollIntervalMillis =
1310 Job.getCompletionPollInterval(cluster.getConf());
1311 while (!isComplete()) {
1312 try {
1313 Thread.sleep(completionPollIntervalMillis);
1314 } catch (InterruptedException ie) {
1315 }
1316 }
1317 }
1318 return isSuccessful();
1319 }
1320
1321 /**
1322 * Monitor a job and print status in real-time as progress is made and tasks
1323 * fail.
1324 * @return true if the job succeeded
1325 * @throws IOException if communication to the JobTracker fails
1326 */
1327 public boolean monitorAndPrintJob()
1328 throws IOException, InterruptedException {
1329 String lastReport = null;
1330 Job.TaskStatusFilter filter;
1331 Configuration clientConf = getConfiguration();
1332 filter = Job.getTaskOutputFilter(clientConf);
1333 JobID jobId = getJobID();
1334 LOG.info("Running job: " + jobId);
1335 int eventCounter = 0;
1336 boolean profiling = getProfileEnabled();
1337 IntegerRanges mapRanges = getProfileTaskRange(true);
1338 IntegerRanges reduceRanges = getProfileTaskRange(false);
1339 int progMonitorPollIntervalMillis =
1340 Job.getProgressPollInterval(clientConf);
1341 /* make sure to report full progress after the job is done */
1342 boolean reportedAfterCompletion = false;
1343 boolean reportedUberMode = false;
1344 while (!isComplete() || !reportedAfterCompletion) {
1345 if (isComplete()) {
1346 reportedAfterCompletion = true;
1347 } else {
1348 Thread.sleep(progMonitorPollIntervalMillis);
1349 }
1350 if (status.getState() == JobStatus.State.PREP) {
1351 continue;
1352 }
1353 if (!reportedUberMode) {
1354 reportedUberMode = true;
1355 LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1356 }
1357 String report =
1358 (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1359 " reduce " +
1360 StringUtils.formatPercent(reduceProgress(), 0));
1361 if (!report.equals(lastReport)) {
1362 LOG.info(report);
1363 lastReport = report;
1364 }
1365
1366 TaskCompletionEvent[] events =
1367 getTaskCompletionEvents(eventCounter, 10);
1368 eventCounter += events.length;
1369 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1370 }
1371 boolean success = isSuccessful();
1372 if (success) {
1373 LOG.info("Job " + jobId + " completed successfully");
1374 } else {
1375 LOG.info("Job " + jobId + " failed with state " + status.getState() +
1376 " due to: " + status.getFailureInfo());
1377 }
1378 Counters counters = getCounters();
1379 if (counters != null) {
1380 LOG.info(counters.toString());
1381 }
1382 return success;
1383 }
1384
1385 /**
1386 * @return true if the profile parameters indicate that this is using
1387 * hprof, which generates profile files in a particular location
1388 * that we can retrieve to the client.
1389 */
1390 private boolean shouldDownloadProfile() {
1391 // Check the argument string that was used to initialize profiling.
1392 // If this indicates hprof and file-based output, then we're ok to
1393 // download.
1394 String profileParams = getProfileParams();
1395
1396 if (null == profileParams) {
1397 return false;
1398 }
1399
1400 // Split this on whitespace.
1401 String [] parts = profileParams.split("[ \\t]+");
1402
1403 // If any of these indicate hprof, and the use of output files, return true.
1404 boolean hprofFound = false;
1405 boolean fileFound = false;
1406 for (String p : parts) {
1407 if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
1408 hprofFound = true;
1409
1410 // This contains a number of comma-delimited components, one of which
1411 // may specify the file to write to. Make sure this is present and
1412 // not empty.
1413 String [] subparts = p.split(",");
1414 for (String sub : subparts) {
1415 if (sub.startsWith("file=") && sub.length() != "file=".length()) {
1416 fileFound = true;
1417 }
1418 }
1419 }
1420 }
1421
1422 return hprofFound && fileFound;
1423 }
1424
1425 private void printTaskEvents(TaskCompletionEvent[] events,
1426 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1427 IntegerRanges reduceRanges) throws IOException, InterruptedException {
1428 for (TaskCompletionEvent event : events) {
1429 switch (filter) {
1430 case NONE:
1431 break;
1432 case SUCCEEDED:
1433 if (event.getStatus() ==
1434 TaskCompletionEvent.Status.SUCCEEDED) {
1435 LOG.info(event.toString());
1436 }
1437 break;
1438 case FAILED:
1439 if (event.getStatus() ==
1440 TaskCompletionEvent.Status.FAILED) {
1441 LOG.info(event.toString());
1442 // Displaying the task diagnostic information
1443 TaskAttemptID taskId = event.getTaskAttemptId();
1444 String[] taskDiagnostics = getTaskDiagnostics(taskId);
1445 if (taskDiagnostics != null) {
1446 for (String diagnostics : taskDiagnostics) {
1447 System.err.println(diagnostics);
1448 }
1449 }
1450 }
1451 break;
1452 case KILLED:
1453 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1454 LOG.info(event.toString());
1455 }
1456 break;
1457 case ALL:
1458 LOG.info(event.toString());
1459 break;
1460 }
1461 }
1462 }
1463
1464 /** The interval at which monitorAndPrintJob() prints status */
1465 public static int getProgressPollInterval(Configuration conf) {
1466 // Read progress monitor poll interval from config. Default is 1 second.
1467 int progMonitorPollIntervalMillis = conf.getInt(
1468 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1469 if (progMonitorPollIntervalMillis < 1) {
1470 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY +
1471 " has been set to an invalid value; "
1472 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1473 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1474 }
1475 return progMonitorPollIntervalMillis;
1476 }
1477
1478 /** The interval at which waitForCompletion() should check. */
1479 public static int getCompletionPollInterval(Configuration conf) {
1480 int completionPollIntervalMillis = conf.getInt(
1481 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1482 if (completionPollIntervalMillis < 1) {
1483 LOG.warn(COMPLETION_POLL_INTERVAL_KEY +
1484 " has been set to an invalid value; "
1485 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1486 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1487 }
1488 return completionPollIntervalMillis;
1489 }
1490
1491 /**
1492 * Get the task output filter.
1493 *
1494 * @param conf the configuration.
1495 * @return the filter level.
1496 */
1497 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1498 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1499 }
1500
1501 /**
1502 * Modify the Configuration to set the task output filter.
1503 *
1504 * @param conf the Configuration to modify.
1505 * @param newValue the value to set.
1506 */
1507 public static void setTaskOutputFilter(Configuration conf,
1508 TaskStatusFilter newValue) {
1509 conf.set(Job.OUTPUT_FILTER, newValue.toString());
1510 }
1511
1512 public boolean isUber() throws IOException, InterruptedException {
1513 ensureState(JobState.RUNNING);
1514 updateStatus();
1515 return status.isUber();
1516 }
1517
1518 }