001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce; 020 021 import java.io.IOException; 022 import java.net.URI; 023 import java.security.PrivilegedExceptionAction; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceStability; 029 import org.apache.hadoop.classification.InterfaceAudience.Private; 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.conf.Configuration.IntegerRanges; 032 import org.apache.hadoop.fs.FileSystem; 033 import org.apache.hadoop.fs.Path; 034 import org.apache.hadoop.io.RawComparator; 035 import org.apache.hadoop.mapred.JobConf; 036 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 037 import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 038 import org.apache.hadoop.mapreduce.task.JobContextImpl; 039 import org.apache.hadoop.mapreduce.util.ConfigUtil; 040 import org.apache.hadoop.util.StringUtils; 041 import org.apache.hadoop.yarn.api.records.ReservationId; 042 043 /** 044 * The job submitter's view of the Job. 045 * 046 * <p>It allows the user to configure the 047 * job, submit it, control its execution, and query the state. The set methods 048 * only work until the job is submitted, afterwards they will throw an 049 * IllegalStateException. </p> 050 * 051 * <p> 052 * Normally the user creates the application, describes various facets of the 053 * job via {@link Job} and then submits the job and monitor its progress.</p> 054 * 055 * <p>Here is an example on how to submit a job:</p> 056 * <p><blockquote><pre> 057 * // Create a new Job 058 * Job job = Job.getInstance(); 059 * job.setJarByClass(MyJob.class); 060 * 061 * // Specify various job-specific parameters 062 * job.setJobName("myjob"); 063 * 064 * job.setInputPath(new Path("in")); 065 * job.setOutputPath(new Path("out")); 066 * 067 * job.setMapperClass(MyJob.MyMapper.class); 068 * job.setReducerClass(MyJob.MyReducer.class); 069 * 070 * // Submit the job, then poll for progress until the job is complete 071 * job.waitForCompletion(true); 072 * </pre></blockquote></p> 073 * 074 * 075 */ 076 @InterfaceAudience.Public 077 @InterfaceStability.Evolving 078 public class Job extends JobContextImpl implements JobContext { 079 private static final Log LOG = LogFactory.getLog(Job.class); 080 081 @InterfaceStability.Evolving 082 public static enum JobState {DEFINE, RUNNING}; 083 private static final long MAX_JOBSTATUS_AGE = 1000 * 2; 084 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter"; 085 /** Key in mapred-*.xml that sets completionPollInvervalMillis */ 086 public static final String COMPLETION_POLL_INTERVAL_KEY = 087 "mapreduce.client.completion.pollinterval"; 088 089 /** Default completionPollIntervalMillis is 5000 ms. */ 090 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000; 091 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */ 092 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY = 093 "mapreduce.client.progressmonitor.pollinterval"; 094 /** Default progMonitorPollIntervalMillis is 1000 ms. */ 095 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; 096 097 public static final String USED_GENERIC_PARSER = 098 "mapreduce.client.genericoptionsparser.used"; 099 public static final String SUBMIT_REPLICATION = 100 "mapreduce.client.submit.file.replication"; 101 private static final String TASKLOG_PULL_TIMEOUT_KEY = 102 "mapreduce.client.tasklog.timeout"; 103 private static final int DEFAULT_TASKLOG_TIMEOUT = 60000; 104 105 @InterfaceStability.Evolving 106 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 107 108 static { 109 ConfigUtil.loadResources(); 110 } 111 112 private JobState state = JobState.DEFINE; 113 private JobStatus status; 114 private long statustime; 115 private Cluster cluster; 116 private ReservationId reservationId; 117 118 /** 119 * @deprecated Use {@link #getInstance()} 120 */ 121 @Deprecated 122 public Job() throws IOException { 123 this(new Configuration()); 124 } 125 126 /** 127 * @deprecated Use {@link #getInstance(Configuration)} 128 */ 129 @Deprecated 130 public Job(Configuration conf) throws IOException { 131 this(new JobConf(conf)); 132 } 133 134 /** 135 * @deprecated Use {@link #getInstance(Configuration, String)} 136 */ 137 @Deprecated 138 public Job(Configuration conf, String jobName) throws IOException { 139 this(conf); 140 setJobName(jobName); 141 } 142 143 Job(JobConf conf) throws IOException { 144 super(conf, null); 145 // propagate existing user credentials to job 146 this.credentials.mergeAll(this.ugi.getCredentials()); 147 this.cluster = null; 148 } 149 150 Job(JobStatus status, JobConf conf) throws IOException { 151 this(conf); 152 setJobID(status.getJobID()); 153 this.status = status; 154 state = JobState.RUNNING; 155 } 156 157 158 /** 159 * Creates a new {@link Job} with no particular {@link Cluster} . 160 * A Cluster will be created with a generic {@link Configuration}. 161 * 162 * @return the {@link Job} , with no connection to a cluster yet. 163 * @throws IOException 164 */ 165 public static Job getInstance() throws IOException { 166 // create with a null Cluster 167 return getInstance(new Configuration()); 168 } 169 170 /** 171 * Creates a new {@link Job} with no particular {@link Cluster} and a 172 * given {@link Configuration}. 173 * 174 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 175 * that any necessary internal modifications do not reflect on the incoming 176 * parameter. 177 * 178 * A Cluster will be created from the conf parameter only when it's needed. 179 * 180 * @param conf the configuration 181 * @return the {@link Job} , with no connection to a cluster yet. 182 * @throws IOException 183 */ 184 public static Job getInstance(Configuration conf) throws IOException { 185 // create with a null Cluster 186 JobConf jobConf = new JobConf(conf); 187 return new Job(jobConf); 188 } 189 190 191 /** 192 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName. 193 * A Cluster will be created from the conf parameter only when it's needed. 194 * 195 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 196 * that any necessary internal modifications do not reflect on the incoming 197 * parameter. 198 * 199 * @param conf the configuration 200 * @return the {@link Job} , with no connection to a cluster yet. 201 * @throws IOException 202 */ 203 public static Job getInstance(Configuration conf, String jobName) 204 throws IOException { 205 // create with a null Cluster 206 Job result = getInstance(conf); 207 result.setJobName(jobName); 208 return result; 209 } 210 211 /** 212 * Creates a new {@link Job} with no particular {@link Cluster} and given 213 * {@link Configuration} and {@link JobStatus}. 214 * A Cluster will be created from the conf parameter only when it's needed. 215 * 216 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 217 * that any necessary internal modifications do not reflect on the incoming 218 * parameter. 219 * 220 * @param status job status 221 * @param conf job configuration 222 * @return the {@link Job} , with no connection to a cluster yet. 223 * @throws IOException 224 */ 225 public static Job getInstance(JobStatus status, Configuration conf) 226 throws IOException { 227 return new Job(status, new JobConf(conf)); 228 } 229 230 /** 231 * Creates a new {@link Job} with no particular {@link Cluster}. 232 * A Cluster will be created from the conf parameter only when it's needed. 233 * 234 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 235 * that any necessary internal modifications do not reflect on the incoming 236 * parameter. 237 * 238 * @param ignored 239 * @return the {@link Job} , with no connection to a cluster yet. 240 * @throws IOException 241 * @deprecated Use {@link #getInstance()} 242 */ 243 @Deprecated 244 public static Job getInstance(Cluster ignored) throws IOException { 245 return getInstance(); 246 } 247 248 /** 249 * Creates a new {@link Job} with no particular {@link Cluster} and given 250 * {@link Configuration}. 251 * A Cluster will be created from the conf parameter only when it's needed. 252 * 253 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 254 * that any necessary internal modifications do not reflect on the incoming 255 * parameter. 256 * 257 * @param ignored 258 * @param conf job configuration 259 * @return the {@link Job} , with no connection to a cluster yet. 260 * @throws IOException 261 * @deprecated Use {@link #getInstance(Configuration)} 262 */ 263 @Deprecated 264 public static Job getInstance(Cluster ignored, Configuration conf) 265 throws IOException { 266 return getInstance(conf); 267 } 268 269 /** 270 * Creates a new {@link Job} with no particular {@link Cluster} and given 271 * {@link Configuration} and {@link JobStatus}. 272 * A Cluster will be created from the conf parameter only when it's needed. 273 * 274 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 275 * that any necessary internal modifications do not reflect on the incoming 276 * parameter. 277 * 278 * @param cluster cluster 279 * @param status job status 280 * @param conf job configuration 281 * @return the {@link Job} , with no connection to a cluster yet. 282 * @throws IOException 283 */ 284 @Private 285 public static Job getInstance(Cluster cluster, JobStatus status, 286 Configuration conf) throws IOException { 287 Job job = getInstance(status, conf); 288 job.setCluster(cluster); 289 return job; 290 } 291 292 private void ensureState(JobState state) throws IllegalStateException { 293 if (state != this.state) { 294 throw new IllegalStateException("Job in state "+ this.state + 295 " instead of " + state); 296 } 297 298 if (state == JobState.RUNNING && cluster == null) { 299 throw new IllegalStateException 300 ("Job in state " + this.state 301 + ", but it isn't attached to any job tracker!"); 302 } 303 } 304 305 /** 306 * Some methods rely on having a recent job status object. Refresh 307 * it, if necessary 308 */ 309 synchronized void ensureFreshStatus() 310 throws IOException { 311 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) { 312 updateStatus(); 313 } 314 } 315 316 /** Some methods need to update status immediately. So, refresh 317 * immediately 318 * @throws IOException 319 */ 320 synchronized void updateStatus() throws IOException { 321 try { 322 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 323 @Override 324 public JobStatus run() throws IOException, InterruptedException { 325 return cluster.getClient().getJobStatus(status.getJobID()); 326 } 327 }); 328 } 329 catch (InterruptedException ie) { 330 throw new IOException(ie); 331 } 332 if (this.status == null) { 333 throw new IOException("Job status not available "); 334 } 335 this.statustime = System.currentTimeMillis(); 336 } 337 338 public JobStatus getStatus() throws IOException, InterruptedException { 339 ensureState(JobState.RUNNING); 340 updateStatus(); 341 return status; 342 } 343 344 private void setStatus(JobStatus status) { 345 this.status = status; 346 } 347 348 /** 349 * Returns the current state of the Job. 350 * 351 * @return JobStatus#State 352 * @throws IOException 353 * @throws InterruptedException 354 */ 355 public JobStatus.State getJobState() 356 throws IOException, InterruptedException { 357 ensureState(JobState.RUNNING); 358 updateStatus(); 359 return status.getState(); 360 } 361 362 /** 363 * Get the URL where some job progress information will be displayed. 364 * 365 * @return the URL where some job progress information will be displayed. 366 */ 367 public String getTrackingURL(){ 368 ensureState(JobState.RUNNING); 369 return status.getTrackingUrl().toString(); 370 } 371 372 /** 373 * Get the path of the submitted job configuration. 374 * 375 * @return the path of the submitted job configuration. 376 */ 377 public String getJobFile() { 378 ensureState(JobState.RUNNING); 379 return status.getJobFile(); 380 } 381 382 /** 383 * Get start time of the job. 384 * 385 * @return the start time of the job 386 */ 387 public long getStartTime() { 388 ensureState(JobState.RUNNING); 389 return status.getStartTime(); 390 } 391 392 /** 393 * Get finish time of the job. 394 * 395 * @return the finish time of the job 396 */ 397 public long getFinishTime() throws IOException, InterruptedException { 398 ensureState(JobState.RUNNING); 399 updateStatus(); 400 return status.getFinishTime(); 401 } 402 403 /** 404 * Get scheduling info of the job. 405 * 406 * @return the scheduling info of the job 407 */ 408 public String getSchedulingInfo() { 409 ensureState(JobState.RUNNING); 410 return status.getSchedulingInfo(); 411 } 412 413 /** 414 * Get scheduling info of the job. 415 * 416 * @return the scheduling info of the job 417 */ 418 public JobPriority getPriority() throws IOException, InterruptedException { 419 ensureState(JobState.RUNNING); 420 updateStatus(); 421 return status.getPriority(); 422 } 423 424 /** 425 * The user-specified job name. 426 */ 427 public String getJobName() { 428 if (state == JobState.DEFINE) { 429 return super.getJobName(); 430 } 431 ensureState(JobState.RUNNING); 432 return status.getJobName(); 433 } 434 435 public String getHistoryUrl() throws IOException, InterruptedException { 436 ensureState(JobState.RUNNING); 437 updateStatus(); 438 return status.getHistoryFile(); 439 } 440 441 public boolean isRetired() throws IOException, InterruptedException { 442 ensureState(JobState.RUNNING); 443 updateStatus(); 444 return status.isRetired(); 445 } 446 447 @Private 448 public Cluster getCluster() { 449 return cluster; 450 } 451 452 /** Only for mocks in unit tests. */ 453 @Private 454 private void setCluster(Cluster cluster) { 455 this.cluster = cluster; 456 } 457 458 /** 459 * Dump stats to screen. 460 */ 461 @Override 462 public String toString() { 463 ensureState(JobState.RUNNING); 464 String reasonforFailure = " "; 465 int numMaps = 0; 466 int numReduces = 0; 467 try { 468 updateStatus(); 469 if (status.getState().equals(JobStatus.State.FAILED)) 470 reasonforFailure = getTaskFailureEventString(); 471 numMaps = getTaskReports(TaskType.MAP).length; 472 numReduces = getTaskReports(TaskType.REDUCE).length; 473 } catch (IOException e) { 474 } catch (InterruptedException ie) { 475 } 476 StringBuffer sb = new StringBuffer(); 477 sb.append("Job: ").append(status.getJobID()).append("\n"); 478 sb.append("Job File: ").append(status.getJobFile()).append("\n"); 479 sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); 480 sb.append("\n"); 481 sb.append("Uber job : ").append(status.isUber()).append("\n"); 482 sb.append("Number of maps: ").append(numMaps).append("\n"); 483 sb.append("Number of reduces: ").append(numReduces).append("\n"); 484 sb.append("map() completion: "); 485 sb.append(status.getMapProgress()).append("\n"); 486 sb.append("reduce() completion: "); 487 sb.append(status.getReduceProgress()).append("\n"); 488 sb.append("Job state: "); 489 sb.append(status.getState()).append("\n"); 490 sb.append("retired: ").append(status.isRetired()).append("\n"); 491 sb.append("reason for failure: ").append(reasonforFailure); 492 return sb.toString(); 493 } 494 495 /** 496 * @return taskid which caused job failure 497 * @throws IOException 498 * @throws InterruptedException 499 */ 500 String getTaskFailureEventString() throws IOException, 501 InterruptedException { 502 int failCount = 1; 503 TaskCompletionEvent lastEvent = null; 504 TaskCompletionEvent[] events = ugi.doAs(new 505 PrivilegedExceptionAction<TaskCompletionEvent[]>() { 506 @Override 507 public TaskCompletionEvent[] run() throws IOException, 508 InterruptedException { 509 return cluster.getClient().getTaskCompletionEvents( 510 status.getJobID(), 0, 10); 511 } 512 }); 513 for (TaskCompletionEvent event : events) { 514 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) { 515 failCount++; 516 lastEvent = event; 517 } 518 } 519 if (lastEvent == null) { 520 return "There are no failed tasks for the job. " 521 + "Job is failed due to some other reason and reason " 522 + "can be found in the logs."; 523 } 524 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2); 525 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2); 526 return (" task " + taskID + " failed " + 527 failCount + " times " + "For details check tasktracker at: " + 528 lastEvent.getTaskTrackerHttp()); 529 } 530 531 /** 532 * Get the information of the current state of the tasks of a job. 533 * 534 * @param type Type of the task 535 * @return the list of all of the map tips. 536 * @throws IOException 537 */ 538 public TaskReport[] getTaskReports(TaskType type) 539 throws IOException, InterruptedException { 540 ensureState(JobState.RUNNING); 541 final TaskType tmpType = type; 542 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() { 543 public TaskReport[] run() throws IOException, InterruptedException { 544 return cluster.getClient().getTaskReports(getJobID(), tmpType); 545 } 546 }); 547 } 548 549 /** 550 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 551 * and 1.0. When all map tasks have completed, the function returns 1.0. 552 * 553 * @return the progress of the job's map-tasks. 554 * @throws IOException 555 */ 556 public float mapProgress() throws IOException { 557 ensureState(JobState.RUNNING); 558 ensureFreshStatus(); 559 return status.getMapProgress(); 560 } 561 562 /** 563 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 564 * and 1.0. When all reduce tasks have completed, the function returns 1.0. 565 * 566 * @return the progress of the job's reduce-tasks. 567 * @throws IOException 568 */ 569 public float reduceProgress() throws IOException { 570 ensureState(JobState.RUNNING); 571 ensureFreshStatus(); 572 return status.getReduceProgress(); 573 } 574 575 /** 576 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 577 * and 1.0. When all cleanup tasks have completed, the function returns 1.0. 578 * 579 * @return the progress of the job's cleanup-tasks. 580 * @throws IOException 581 */ 582 public float cleanupProgress() throws IOException, InterruptedException { 583 ensureState(JobState.RUNNING); 584 ensureFreshStatus(); 585 return status.getCleanupProgress(); 586 } 587 588 /** 589 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 590 * and 1.0. When all setup tasks have completed, the function returns 1.0. 591 * 592 * @return the progress of the job's setup-tasks. 593 * @throws IOException 594 */ 595 public float setupProgress() throws IOException { 596 ensureState(JobState.RUNNING); 597 ensureFreshStatus(); 598 return status.getSetupProgress(); 599 } 600 601 /** 602 * Check if the job is finished or not. 603 * This is a non-blocking call. 604 * 605 * @return <code>true</code> if the job is complete, else <code>false</code>. 606 * @throws IOException 607 */ 608 public boolean isComplete() throws IOException { 609 ensureState(JobState.RUNNING); 610 updateStatus(); 611 return status.isJobComplete(); 612 } 613 614 /** 615 * Check if the job completed successfully. 616 * 617 * @return <code>true</code> if the job succeeded, else <code>false</code>. 618 * @throws IOException 619 */ 620 public boolean isSuccessful() throws IOException { 621 ensureState(JobState.RUNNING); 622 updateStatus(); 623 return status.getState() == JobStatus.State.SUCCEEDED; 624 } 625 626 /** 627 * Kill the running job. Blocks until all job tasks have been 628 * killed as well. If the job is no longer running, it simply returns. 629 * 630 * @throws IOException 631 */ 632 public void killJob() throws IOException { 633 ensureState(JobState.RUNNING); 634 try { 635 cluster.getClient().killJob(getJobID()); 636 } 637 catch (InterruptedException ie) { 638 throw new IOException(ie); 639 } 640 } 641 642 /** 643 * Set the priority of a running job. 644 * @param priority the new priority for the job. 645 * @throws IOException 646 */ 647 public void setPriority(JobPriority priority) 648 throws IOException, InterruptedException { 649 if (state == JobState.DEFINE) { 650 conf.setJobPriority( 651 org.apache.hadoop.mapred.JobPriority.valueOf(priority.name())); 652 } else { 653 ensureState(JobState.RUNNING); 654 final JobPriority tmpPriority = priority; 655 ugi.doAs(new PrivilegedExceptionAction<Object>() { 656 @Override 657 public Object run() throws IOException, InterruptedException { 658 cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString()); 659 return null; 660 } 661 }); 662 } 663 } 664 665 /** 666 * Get events indicating completion (success/failure) of component tasks. 667 * 668 * @param startFrom index to start fetching events from 669 * @param numEvents number of events to fetch 670 * @return an array of {@link TaskCompletionEvent}s 671 * @throws IOException 672 */ 673 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom, 674 final int numEvents) throws IOException, InterruptedException { 675 ensureState(JobState.RUNNING); 676 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() { 677 @Override 678 public TaskCompletionEvent[] run() throws IOException, InterruptedException { 679 return cluster.getClient().getTaskCompletionEvents(getJobID(), 680 startFrom, numEvents); 681 } 682 }); 683 } 684 685 /** 686 * Get events indicating completion (success/failure) of component tasks. 687 * 688 * @param startFrom index to start fetching events from 689 * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s 690 * @throws IOException 691 */ 692 public org.apache.hadoop.mapred.TaskCompletionEvent[] 693 getTaskCompletionEvents(final int startFrom) throws IOException { 694 try { 695 TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10); 696 org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents = 697 new org.apache.hadoop.mapred.TaskCompletionEvent[events.length]; 698 for (int i = 0; i < events.length; i++) { 699 retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade 700 (events[i]); 701 } 702 return retEvents; 703 } catch (InterruptedException ie) { 704 throw new IOException(ie); 705 } 706 } 707 708 /** 709 * Kill indicated task attempt. 710 * @param taskId the id of the task to kill. 711 * @param shouldFail if <code>true</code> the task is failed and added 712 * to failed tasks list, otherwise it is just killed, 713 * w/o affecting job failure status. 714 */ 715 @Private 716 public boolean killTask(final TaskAttemptID taskId, 717 final boolean shouldFail) throws IOException { 718 ensureState(JobState.RUNNING); 719 try { 720 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { 721 public Boolean run() throws IOException, InterruptedException { 722 return cluster.getClient().killTask(taskId, shouldFail); 723 } 724 }); 725 } 726 catch (InterruptedException ie) { 727 throw new IOException(ie); 728 } 729 } 730 731 /** 732 * Kill indicated task attempt. 733 * 734 * @param taskId the id of the task to be terminated. 735 * @throws IOException 736 */ 737 public void killTask(final TaskAttemptID taskId) 738 throws IOException { 739 killTask(taskId, false); 740 } 741 742 /** 743 * Fail indicated task attempt. 744 * 745 * @param taskId the id of the task to be terminated. 746 * @throws IOException 747 */ 748 public void failTask(final TaskAttemptID taskId) 749 throws IOException { 750 killTask(taskId, true); 751 } 752 753 /** 754 * Gets the counters for this job. May return null if the job has been 755 * retired and the job is no longer in the completed job store. 756 * 757 * @return the counters for this job. 758 * @throws IOException 759 */ 760 public Counters getCounters() 761 throws IOException { 762 ensureState(JobState.RUNNING); 763 try { 764 return ugi.doAs(new PrivilegedExceptionAction<Counters>() { 765 @Override 766 public Counters run() throws IOException, InterruptedException { 767 return cluster.getClient().getJobCounters(getJobID()); 768 } 769 }); 770 } 771 catch (InterruptedException ie) { 772 throw new IOException(ie); 773 } 774 } 775 776 /** 777 * Gets the diagnostic messages for a given task attempt. 778 * @param taskid 779 * @return the list of diagnostic messages for the task 780 * @throws IOException 781 */ 782 public String[] getTaskDiagnostics(final TaskAttemptID taskid) 783 throws IOException, InterruptedException { 784 ensureState(JobState.RUNNING); 785 return ugi.doAs(new PrivilegedExceptionAction<String[]>() { 786 @Override 787 public String[] run() throws IOException, InterruptedException { 788 return cluster.getClient().getTaskDiagnostics(taskid); 789 } 790 }); 791 } 792 793 /** 794 * Set the number of reduce tasks for the job. 795 * @param tasks the number of reduce tasks 796 * @throws IllegalStateException if the job is submitted 797 */ 798 public void setNumReduceTasks(int tasks) throws IllegalStateException { 799 ensureState(JobState.DEFINE); 800 conf.setNumReduceTasks(tasks); 801 } 802 803 /** 804 * Set the current working directory for the default file system. 805 * 806 * @param dir the new current working directory. 807 * @throws IllegalStateException if the job is submitted 808 */ 809 public void setWorkingDirectory(Path dir) throws IOException { 810 ensureState(JobState.DEFINE); 811 conf.setWorkingDirectory(dir); 812 } 813 814 /** 815 * Set the {@link InputFormat} for the job. 816 * @param cls the <code>InputFormat</code> to use 817 * @throws IllegalStateException if the job is submitted 818 */ 819 public void setInputFormatClass(Class<? extends InputFormat> cls 820 ) throws IllegalStateException { 821 ensureState(JobState.DEFINE); 822 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 823 InputFormat.class); 824 } 825 826 /** 827 * Set the {@link OutputFormat} for the job. 828 * @param cls the <code>OutputFormat</code> to use 829 * @throws IllegalStateException if the job is submitted 830 */ 831 public void setOutputFormatClass(Class<? extends OutputFormat> cls 832 ) throws IllegalStateException { 833 ensureState(JobState.DEFINE); 834 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 835 OutputFormat.class); 836 } 837 838 /** 839 * Set the {@link Mapper} for the job. 840 * @param cls the <code>Mapper</code> to use 841 * @throws IllegalStateException if the job is submitted 842 */ 843 public void setMapperClass(Class<? extends Mapper> cls 844 ) throws IllegalStateException { 845 ensureState(JobState.DEFINE); 846 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class); 847 } 848 849 /** 850 * Set the Jar by finding where a given class came from. 851 * @param cls the example class 852 */ 853 public void setJarByClass(Class<?> cls) { 854 ensureState(JobState.DEFINE); 855 conf.setJarByClass(cls); 856 } 857 858 /** 859 * Set the job jar 860 */ 861 public void setJar(String jar) { 862 ensureState(JobState.DEFINE); 863 conf.setJar(jar); 864 } 865 866 /** 867 * Set the reported username for this job. 868 * 869 * @param user the username for this job. 870 */ 871 public void setUser(String user) { 872 ensureState(JobState.DEFINE); 873 conf.setUser(user); 874 } 875 876 /** 877 * Set the combiner class for the job. 878 * @param cls the combiner to use 879 * @throws IllegalStateException if the job is submitted 880 */ 881 public void setCombinerClass(Class<? extends Reducer> cls 882 ) throws IllegalStateException { 883 ensureState(JobState.DEFINE); 884 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); 885 } 886 887 /** 888 * Set the {@link Reducer} for the job. 889 * @param cls the <code>Reducer</code> to use 890 * @throws IllegalStateException if the job is submitted 891 */ 892 public void setReducerClass(Class<? extends Reducer> cls 893 ) throws IllegalStateException { 894 ensureState(JobState.DEFINE); 895 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class); 896 } 897 898 /** 899 * Set the {@link Partitioner} for the job. 900 * @param cls the <code>Partitioner</code> to use 901 * @throws IllegalStateException if the job is submitted 902 */ 903 public void setPartitionerClass(Class<? extends Partitioner> cls 904 ) throws IllegalStateException { 905 ensureState(JobState.DEFINE); 906 conf.setClass(PARTITIONER_CLASS_ATTR, cls, 907 Partitioner.class); 908 } 909 910 /** 911 * Set the key class for the map output data. This allows the user to 912 * specify the map output key class to be different than the final output 913 * value class. 914 * 915 * @param theClass the map output key class. 916 * @throws IllegalStateException if the job is submitted 917 */ 918 public void setMapOutputKeyClass(Class<?> theClass 919 ) throws IllegalStateException { 920 ensureState(JobState.DEFINE); 921 conf.setMapOutputKeyClass(theClass); 922 } 923 924 /** 925 * Set the value class for the map output data. This allows the user to 926 * specify the map output value class to be different than the final output 927 * value class. 928 * 929 * @param theClass the map output value class. 930 * @throws IllegalStateException if the job is submitted 931 */ 932 public void setMapOutputValueClass(Class<?> theClass 933 ) throws IllegalStateException { 934 ensureState(JobState.DEFINE); 935 conf.setMapOutputValueClass(theClass); 936 } 937 938 /** 939 * Set the key class for the job output data. 940 * 941 * @param theClass the key class for the job output data. 942 * @throws IllegalStateException if the job is submitted 943 */ 944 public void setOutputKeyClass(Class<?> theClass 945 ) throws IllegalStateException { 946 ensureState(JobState.DEFINE); 947 conf.setOutputKeyClass(theClass); 948 } 949 950 /** 951 * Set the value class for job outputs. 952 * 953 * @param theClass the value class for job outputs. 954 * @throws IllegalStateException if the job is submitted 955 */ 956 public void setOutputValueClass(Class<?> theClass 957 ) throws IllegalStateException { 958 ensureState(JobState.DEFINE); 959 conf.setOutputValueClass(theClass); 960 } 961 962 /** 963 * Define the comparator that controls which keys are grouped together 964 * for a single call to combiner, 965 * {@link Reducer#reduce(Object, Iterable, 966 * org.apache.hadoop.mapreduce.Reducer.Context)} 967 * 968 * @param cls the raw comparator to use 969 * @throws IllegalStateException if the job is submitted 970 */ 971 public void setCombinerKeyGroupingComparatorClass( 972 Class<? extends RawComparator> cls) throws IllegalStateException { 973 ensureState(JobState.DEFINE); 974 conf.setCombinerKeyGroupingComparator(cls); 975 } 976 977 /** 978 * Define the comparator that controls how the keys are sorted before they 979 * are passed to the {@link Reducer}. 980 * @param cls the raw comparator 981 * @throws IllegalStateException if the job is submitted 982 * @see #setCombinerKeyGroupingComparatorClass(Class) 983 */ 984 public void setSortComparatorClass(Class<? extends RawComparator> cls 985 ) throws IllegalStateException { 986 ensureState(JobState.DEFINE); 987 conf.setOutputKeyComparatorClass(cls); 988 } 989 990 /** 991 * Define the comparator that controls which keys are grouped together 992 * for a single call to 993 * {@link Reducer#reduce(Object, Iterable, 994 * org.apache.hadoop.mapreduce.Reducer.Context)} 995 * @param cls the raw comparator to use 996 * @throws IllegalStateException if the job is submitted 997 * @see #setCombinerKeyGroupingComparatorClass(Class) 998 */ 999 public void setGroupingComparatorClass(Class<? extends RawComparator> cls 1000 ) throws IllegalStateException { 1001 ensureState(JobState.DEFINE); 1002 conf.setOutputValueGroupingComparator(cls); 1003 } 1004 1005 /** 1006 * Set the user-specified job name. 1007 * 1008 * @param name the job's new name. 1009 * @throws IllegalStateException if the job is submitted 1010 */ 1011 public void setJobName(String name) throws IllegalStateException { 1012 ensureState(JobState.DEFINE); 1013 conf.setJobName(name); 1014 } 1015 1016 /** 1017 * Turn speculative execution on or off for this job. 1018 * 1019 * @param speculativeExecution <code>true</code> if speculative execution 1020 * should be turned on, else <code>false</code>. 1021 */ 1022 public void setSpeculativeExecution(boolean speculativeExecution) { 1023 ensureState(JobState.DEFINE); 1024 conf.setSpeculativeExecution(speculativeExecution); 1025 } 1026 1027 /** 1028 * Turn speculative execution on or off for this job for map tasks. 1029 * 1030 * @param speculativeExecution <code>true</code> if speculative execution 1031 * should be turned on for map tasks, 1032 * else <code>false</code>. 1033 */ 1034 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1035 ensureState(JobState.DEFINE); 1036 conf.setMapSpeculativeExecution(speculativeExecution); 1037 } 1038 1039 /** 1040 * Turn speculative execution on or off for this job for reduce tasks. 1041 * 1042 * @param speculativeExecution <code>true</code> if speculative execution 1043 * should be turned on for reduce tasks, 1044 * else <code>false</code>. 1045 */ 1046 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1047 ensureState(JobState.DEFINE); 1048 conf.setReduceSpeculativeExecution(speculativeExecution); 1049 } 1050 1051 /** 1052 * Specify whether job-setup and job-cleanup is needed for the job 1053 * 1054 * @param needed If <code>true</code>, job-setup and job-cleanup will be 1055 * considered from {@link OutputCommitter} 1056 * else ignored. 1057 */ 1058 public void setJobSetupCleanupNeeded(boolean needed) { 1059 ensureState(JobState.DEFINE); 1060 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed); 1061 } 1062 1063 /** 1064 * Set the given set of archives 1065 * @param archives The list of archives that need to be localized 1066 */ 1067 public void setCacheArchives(URI[] archives) { 1068 ensureState(JobState.DEFINE); 1069 DistributedCache.setCacheArchives(archives, conf); 1070 } 1071 1072 /** 1073 * Set the given set of files 1074 * @param files The list of files that need to be localized 1075 */ 1076 public void setCacheFiles(URI[] files) { 1077 ensureState(JobState.DEFINE); 1078 DistributedCache.setCacheFiles(files, conf); 1079 } 1080 1081 /** 1082 * Add a archives to be localized 1083 * @param uri The uri of the cache to be localized 1084 */ 1085 public void addCacheArchive(URI uri) { 1086 ensureState(JobState.DEFINE); 1087 DistributedCache.addCacheArchive(uri, conf); 1088 } 1089 1090 /** 1091 * Add a file to be localized 1092 * @param uri The uri of the cache to be localized 1093 */ 1094 public void addCacheFile(URI uri) { 1095 ensureState(JobState.DEFINE); 1096 DistributedCache.addCacheFile(uri, conf); 1097 } 1098 1099 /** 1100 * Add an file path to the current set of classpath entries It adds the file 1101 * to cache as well. 1102 * 1103 * Files added with this method will not be unpacked while being added to the 1104 * classpath. 1105 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)} 1106 * method instead. 1107 * 1108 * @param file Path of the file to be added 1109 */ 1110 public void addFileToClassPath(Path file) 1111 throws IOException { 1112 ensureState(JobState.DEFINE); 1113 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf)); 1114 } 1115 1116 /** 1117 * Add an archive path to the current set of classpath entries. It adds the 1118 * archive to cache as well. 1119 * 1120 * Archive files will be unpacked and added to the classpath 1121 * when being distributed. 1122 * 1123 * @param archive Path of the archive to be added 1124 */ 1125 public void addArchiveToClassPath(Path archive) 1126 throws IOException { 1127 ensureState(JobState.DEFINE); 1128 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); 1129 } 1130 1131 /** 1132 * Originally intended to enable symlinks, but currently symlinks cannot be 1133 * disabled. 1134 */ 1135 @Deprecated 1136 public void createSymlink() { 1137 ensureState(JobState.DEFINE); 1138 DistributedCache.createSymlink(conf); 1139 } 1140 1141 /** 1142 * Expert: Set the number of maximum attempts that will be made to run a 1143 * map task. 1144 * 1145 * @param n the number of attempts per map task. 1146 */ 1147 public void setMaxMapAttempts(int n) { 1148 ensureState(JobState.DEFINE); 1149 conf.setMaxMapAttempts(n); 1150 } 1151 1152 /** 1153 * Expert: Set the number of maximum attempts that will be made to run a 1154 * reduce task. 1155 * 1156 * @param n the number of attempts per reduce task. 1157 */ 1158 public void setMaxReduceAttempts(int n) { 1159 ensureState(JobState.DEFINE); 1160 conf.setMaxReduceAttempts(n); 1161 } 1162 1163 /** 1164 * Set whether the system should collect profiler information for some of 1165 * the tasks in this job? The information is stored in the user log 1166 * directory. 1167 * @param newValue true means it should be gathered 1168 */ 1169 public void setProfileEnabled(boolean newValue) { 1170 ensureState(JobState.DEFINE); 1171 conf.setProfileEnabled(newValue); 1172 } 1173 1174 /** 1175 * Set the profiler configuration arguments. If the string contains a '%s' it 1176 * will be replaced with the name of the profiling output file when the task 1177 * runs. 1178 * 1179 * This value is passed to the task child JVM on the command line. 1180 * 1181 * @param value the configuration string 1182 */ 1183 public void setProfileParams(String value) { 1184 ensureState(JobState.DEFINE); 1185 conf.setProfileParams(value); 1186 } 1187 1188 /** 1189 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1190 * must also be called. 1191 * @param newValue a set of integer ranges of the map ids 1192 */ 1193 public void setProfileTaskRange(boolean isMap, String newValue) { 1194 ensureState(JobState.DEFINE); 1195 conf.setProfileTaskRange(isMap, newValue); 1196 } 1197 1198 private void ensureNotSet(String attr, String msg) throws IOException { 1199 if (conf.get(attr) != null) { 1200 throw new IOException(attr + " is incompatible with " + msg + " mode."); 1201 } 1202 } 1203 1204 /** 1205 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation 1206 * tokens upon job completion. Defaults to true. 1207 */ 1208 public void setCancelDelegationTokenUponJobCompletion(boolean value) { 1209 ensureState(JobState.DEFINE); 1210 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value); 1211 } 1212 1213 /** 1214 * Default to the new APIs unless they are explicitly set or the old mapper or 1215 * reduce attributes are used. 1216 * @throws IOException if the configuration is inconsistant 1217 */ 1218 private void setUseNewAPI() throws IOException { 1219 int numReduces = conf.getNumReduceTasks(); 1220 String oldMapperClass = "mapred.mapper.class"; 1221 String oldReduceClass = "mapred.reducer.class"; 1222 conf.setBooleanIfUnset("mapred.mapper.new-api", 1223 conf.get(oldMapperClass) == null); 1224 if (conf.getUseNewMapper()) { 1225 String mode = "new map API"; 1226 ensureNotSet("mapred.input.format.class", mode); 1227 ensureNotSet(oldMapperClass, mode); 1228 if (numReduces != 0) { 1229 ensureNotSet("mapred.partitioner.class", mode); 1230 } else { 1231 ensureNotSet("mapred.output.format.class", mode); 1232 } 1233 } else { 1234 String mode = "map compatability"; 1235 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode); 1236 ensureNotSet(MAP_CLASS_ATTR, mode); 1237 if (numReduces != 0) { 1238 ensureNotSet(PARTITIONER_CLASS_ATTR, mode); 1239 } else { 1240 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1241 } 1242 } 1243 if (numReduces != 0) { 1244 conf.setBooleanIfUnset("mapred.reducer.new-api", 1245 conf.get(oldReduceClass) == null); 1246 if (conf.getUseNewReducer()) { 1247 String mode = "new reduce API"; 1248 ensureNotSet("mapred.output.format.class", mode); 1249 ensureNotSet(oldReduceClass, mode); 1250 } else { 1251 String mode = "reduce compatability"; 1252 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1253 ensureNotSet(REDUCE_CLASS_ATTR, mode); 1254 } 1255 } 1256 } 1257 1258 private synchronized void connect() 1259 throws IOException, InterruptedException, ClassNotFoundException { 1260 if (cluster == null) { 1261 cluster = 1262 ugi.doAs(new PrivilegedExceptionAction<Cluster>() { 1263 public Cluster run() 1264 throws IOException, InterruptedException, 1265 ClassNotFoundException { 1266 return new Cluster(getConfiguration()); 1267 } 1268 }); 1269 } 1270 } 1271 1272 boolean isConnected() { 1273 return cluster != null; 1274 } 1275 1276 /** Only for mocking via unit tests. */ 1277 @Private 1278 public JobSubmitter getJobSubmitter(FileSystem fs, 1279 ClientProtocol submitClient) throws IOException { 1280 return new JobSubmitter(fs, submitClient); 1281 } 1282 /** 1283 * Submit the job to the cluster and return immediately. 1284 * @throws IOException 1285 */ 1286 public void submit() 1287 throws IOException, InterruptedException, ClassNotFoundException { 1288 ensureState(JobState.DEFINE); 1289 setUseNewAPI(); 1290 connect(); 1291 final JobSubmitter submitter = 1292 getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); 1293 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 1294 public JobStatus run() throws IOException, InterruptedException, 1295 ClassNotFoundException { 1296 return submitter.submitJobInternal(Job.this, cluster); 1297 } 1298 }); 1299 state = JobState.RUNNING; 1300 LOG.info("The url to track the job: " + getTrackingURL()); 1301 } 1302 1303 /** 1304 * Submit the job to the cluster and wait for it to finish. 1305 * @param verbose print the progress to the user 1306 * @return true if the job succeeded 1307 * @throws IOException thrown if the communication with the 1308 * <code>JobTracker</code> is lost 1309 */ 1310 public boolean waitForCompletion(boolean verbose 1311 ) throws IOException, InterruptedException, 1312 ClassNotFoundException { 1313 if (state == JobState.DEFINE) { 1314 submit(); 1315 } 1316 if (verbose) { 1317 monitorAndPrintJob(); 1318 } else { 1319 // get the completion poll interval from the client. 1320 int completionPollIntervalMillis = 1321 Job.getCompletionPollInterval(cluster.getConf()); 1322 while (!isComplete()) { 1323 try { 1324 Thread.sleep(completionPollIntervalMillis); 1325 } catch (InterruptedException ie) { 1326 } 1327 } 1328 } 1329 return isSuccessful(); 1330 } 1331 1332 /** 1333 * Monitor a job and print status in real-time as progress is made and tasks 1334 * fail. 1335 * @return true if the job succeeded 1336 * @throws IOException if communication to the JobTracker fails 1337 */ 1338 public boolean monitorAndPrintJob() 1339 throws IOException, InterruptedException { 1340 String lastReport = null; 1341 Job.TaskStatusFilter filter; 1342 Configuration clientConf = getConfiguration(); 1343 filter = Job.getTaskOutputFilter(clientConf); 1344 JobID jobId = getJobID(); 1345 LOG.info("Running job: " + jobId); 1346 int eventCounter = 0; 1347 boolean profiling = getProfileEnabled(); 1348 IntegerRanges mapRanges = getProfileTaskRange(true); 1349 IntegerRanges reduceRanges = getProfileTaskRange(false); 1350 int progMonitorPollIntervalMillis = 1351 Job.getProgressPollInterval(clientConf); 1352 /* make sure to report full progress after the job is done */ 1353 boolean reportedAfterCompletion = false; 1354 boolean reportedUberMode = false; 1355 while (!isComplete() || !reportedAfterCompletion) { 1356 if (isComplete()) { 1357 reportedAfterCompletion = true; 1358 } else { 1359 Thread.sleep(progMonitorPollIntervalMillis); 1360 } 1361 if (status.getState() == JobStatus.State.PREP) { 1362 continue; 1363 } 1364 if (!reportedUberMode) { 1365 reportedUberMode = true; 1366 LOG.info("Job " + jobId + " running in uber mode : " + isUber()); 1367 } 1368 String report = 1369 (" map " + StringUtils.formatPercent(mapProgress(), 0)+ 1370 " reduce " + 1371 StringUtils.formatPercent(reduceProgress(), 0)); 1372 if (!report.equals(lastReport)) { 1373 LOG.info(report); 1374 lastReport = report; 1375 } 1376 1377 TaskCompletionEvent[] events = 1378 getTaskCompletionEvents(eventCounter, 10); 1379 eventCounter += events.length; 1380 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges); 1381 } 1382 boolean success = isSuccessful(); 1383 if (success) { 1384 LOG.info("Job " + jobId + " completed successfully"); 1385 } else { 1386 LOG.info("Job " + jobId + " failed with state " + status.getState() + 1387 " due to: " + status.getFailureInfo()); 1388 } 1389 Counters counters = getCounters(); 1390 if (counters != null) { 1391 LOG.info(counters.toString()); 1392 } 1393 return success; 1394 } 1395 1396 /** 1397 * @return true if the profile parameters indicate that this is using 1398 * hprof, which generates profile files in a particular location 1399 * that we can retrieve to the client. 1400 */ 1401 private boolean shouldDownloadProfile() { 1402 // Check the argument string that was used to initialize profiling. 1403 // If this indicates hprof and file-based output, then we're ok to 1404 // download. 1405 String profileParams = getProfileParams(); 1406 1407 if (null == profileParams) { 1408 return false; 1409 } 1410 1411 // Split this on whitespace. 1412 String [] parts = profileParams.split("[ \\t]+"); 1413 1414 // If any of these indicate hprof, and the use of output files, return true. 1415 boolean hprofFound = false; 1416 boolean fileFound = false; 1417 for (String p : parts) { 1418 if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) { 1419 hprofFound = true; 1420 1421 // This contains a number of comma-delimited components, one of which 1422 // may specify the file to write to. Make sure this is present and 1423 // not empty. 1424 String [] subparts = p.split(","); 1425 for (String sub : subparts) { 1426 if (sub.startsWith("file=") && sub.length() != "file=".length()) { 1427 fileFound = true; 1428 } 1429 } 1430 } 1431 } 1432 1433 return hprofFound && fileFound; 1434 } 1435 1436 private void printTaskEvents(TaskCompletionEvent[] events, 1437 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges, 1438 IntegerRanges reduceRanges) throws IOException, InterruptedException { 1439 for (TaskCompletionEvent event : events) { 1440 switch (filter) { 1441 case NONE: 1442 break; 1443 case SUCCEEDED: 1444 if (event.getStatus() == 1445 TaskCompletionEvent.Status.SUCCEEDED) { 1446 LOG.info(event.toString()); 1447 } 1448 break; 1449 case FAILED: 1450 if (event.getStatus() == 1451 TaskCompletionEvent.Status.FAILED) { 1452 LOG.info(event.toString()); 1453 // Displaying the task diagnostic information 1454 TaskAttemptID taskId = event.getTaskAttemptId(); 1455 String[] taskDiagnostics = getTaskDiagnostics(taskId); 1456 if (taskDiagnostics != null) { 1457 for (String diagnostics : taskDiagnostics) { 1458 System.err.println(diagnostics); 1459 } 1460 } 1461 } 1462 break; 1463 case KILLED: 1464 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){ 1465 LOG.info(event.toString()); 1466 } 1467 break; 1468 case ALL: 1469 LOG.info(event.toString()); 1470 break; 1471 } 1472 } 1473 } 1474 1475 /** The interval at which monitorAndPrintJob() prints status */ 1476 public static int getProgressPollInterval(Configuration conf) { 1477 // Read progress monitor poll interval from config. Default is 1 second. 1478 int progMonitorPollIntervalMillis = conf.getInt( 1479 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL); 1480 if (progMonitorPollIntervalMillis < 1) { 1481 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 1482 " has been set to an invalid value; " 1483 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL); 1484 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL; 1485 } 1486 return progMonitorPollIntervalMillis; 1487 } 1488 1489 /** The interval at which waitForCompletion() should check. */ 1490 public static int getCompletionPollInterval(Configuration conf) { 1491 int completionPollIntervalMillis = conf.getInt( 1492 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL); 1493 if (completionPollIntervalMillis < 1) { 1494 LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 1495 " has been set to an invalid value; " 1496 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL); 1497 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL; 1498 } 1499 return completionPollIntervalMillis; 1500 } 1501 1502 /** 1503 * Get the task output filter. 1504 * 1505 * @param conf the configuration. 1506 * @return the filter level. 1507 */ 1508 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) { 1509 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED")); 1510 } 1511 1512 /** 1513 * Modify the Configuration to set the task output filter. 1514 * 1515 * @param conf the Configuration to modify. 1516 * @param newValue the value to set. 1517 */ 1518 public static void setTaskOutputFilter(Configuration conf, 1519 TaskStatusFilter newValue) { 1520 conf.set(Job.OUTPUT_FILTER, newValue.toString()); 1521 } 1522 1523 public boolean isUber() throws IOException, InterruptedException { 1524 ensureState(JobState.RUNNING); 1525 updateStatus(); 1526 return status.isUber(); 1527 } 1528 1529 /** 1530 * Get the reservation to which the job is submitted to, if any 1531 * 1532 * @return the reservationId the identifier of the job's reservation, null if 1533 * the job does not have any reservation associated with it 1534 */ 1535 public ReservationId getReservationId() { 1536 return reservationId; 1537 } 1538 1539 /** 1540 * Set the reservation to which the job is submitted to 1541 * 1542 * @param reservationId the reservationId to set 1543 */ 1544 public void setReservationId(ReservationId reservationId) { 1545 this.reservationId = reservationId; 1546 } 1547 1548 }