001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.IOException; 022import java.net.URI; 023import java.security.PrivilegedExceptionAction; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.classification.InterfaceAudience.Private; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.conf.Configuration.IntegerRanges; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.RawComparator; 035import org.apache.hadoop.mapred.JobConf; 036import org.apache.hadoop.mapreduce.filecache.DistributedCache; 037import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 038import org.apache.hadoop.mapreduce.task.JobContextImpl; 039import org.apache.hadoop.mapreduce.util.ConfigUtil; 040import org.apache.hadoop.util.StringUtils; 041import org.apache.hadoop.yarn.api.records.ReservationId; 042 043/** 044 * The job submitter's view of the Job. 045 * 046 * <p>It allows the user to configure the 047 * job, submit it, control its execution, and query the state. The set methods 048 * only work until the job is submitted, afterwards they will throw an 049 * IllegalStateException. </p> 050 * 051 * <p> 052 * Normally the user creates the application, describes various facets of the 053 * job via {@link Job} and then submits the job and monitor its progress.</p> 054 * 055 * <p>Here is an example on how to submit a job:</p> 056 * <p><blockquote><pre> 057 * // Create a new Job 058 * Job job = Job.getInstance(); 059 * job.setJarByClass(MyJob.class); 060 * 061 * // Specify various job-specific parameters 062 * job.setJobName("myjob"); 063 * 064 * job.setInputPath(new Path("in")); 065 * job.setOutputPath(new Path("out")); 066 * 067 * job.setMapperClass(MyJob.MyMapper.class); 068 * job.setReducerClass(MyJob.MyReducer.class); 069 * 070 * // Submit the job, then poll for progress until the job is complete 071 * job.waitForCompletion(true); 072 * </pre></blockquote> 073 * 074 * 075 */ 076@InterfaceAudience.Public 077@InterfaceStability.Evolving 078public class Job extends JobContextImpl implements JobContext { 079 private static final Log LOG = LogFactory.getLog(Job.class); 080 081 @InterfaceStability.Evolving 082 public static enum JobState {DEFINE, RUNNING}; 083 private static final long MAX_JOBSTATUS_AGE = 1000 * 2; 084 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter"; 085 /** Key in mapred-*.xml that sets completionPollInvervalMillis */ 086 public static final String COMPLETION_POLL_INTERVAL_KEY = 087 "mapreduce.client.completion.pollinterval"; 088 089 /** Default completionPollIntervalMillis is 5000 ms. */ 090 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000; 091 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */ 092 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY = 093 "mapreduce.client.progressmonitor.pollinterval"; 094 /** Default progMonitorPollIntervalMillis is 1000 ms. */ 095 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; 096 097 public static final String USED_GENERIC_PARSER = 098 "mapreduce.client.genericoptionsparser.used"; 099 public static final String SUBMIT_REPLICATION = 100 "mapreduce.client.submit.file.replication"; 101 public static final int DEFAULT_SUBMIT_REPLICATION = 10; 102 103 @InterfaceStability.Evolving 104 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 105 106 static { 107 ConfigUtil.loadResources(); 108 } 109 110 private JobState state = JobState.DEFINE; 111 private JobStatus status; 112 private long statustime; 113 private Cluster cluster; 114 private ReservationId reservationId; 115 116 /** 117 * @deprecated Use {@link #getInstance()} 118 */ 119 @Deprecated 120 public Job() throws IOException { 121 this(new JobConf(new Configuration())); 122 } 123 124 /** 125 * @deprecated Use {@link #getInstance(Configuration)} 126 */ 127 @Deprecated 128 public Job(Configuration conf) throws IOException { 129 this(new JobConf(conf)); 130 } 131 132 /** 133 * @deprecated Use {@link #getInstance(Configuration, String)} 134 */ 135 @Deprecated 136 public Job(Configuration conf, String jobName) throws IOException { 137 this(new JobConf(conf)); 138 setJobName(jobName); 139 } 140 141 Job(JobConf conf) throws IOException { 142 super(conf, null); 143 // propagate existing user credentials to job 144 this.credentials.mergeAll(this.ugi.getCredentials()); 145 this.cluster = null; 146 } 147 148 Job(JobStatus status, JobConf conf) throws IOException { 149 this(conf); 150 setJobID(status.getJobID()); 151 this.status = status; 152 state = JobState.RUNNING; 153 } 154 155 156 /** 157 * Creates a new {@link Job} with no particular {@link Cluster} . 158 * A Cluster will be created with a generic {@link Configuration}. 159 * 160 * @return the {@link Job} , with no connection to a cluster yet. 161 * @throws IOException 162 */ 163 public static Job getInstance() throws IOException { 164 // create with a null Cluster 165 return getInstance(new Configuration()); 166 } 167 168 /** 169 * Creates a new {@link Job} with no particular {@link Cluster} and a 170 * given {@link Configuration}. 171 * 172 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 173 * that any necessary internal modifications do not reflect on the incoming 174 * parameter. 175 * 176 * A Cluster will be created from the conf parameter only when it's needed. 177 * 178 * @param conf the configuration 179 * @return the {@link Job} , with no connection to a cluster yet. 180 * @throws IOException 181 */ 182 public static Job getInstance(Configuration conf) throws IOException { 183 // create with a null Cluster 184 JobConf jobConf = new JobConf(conf); 185 return new Job(jobConf); 186 } 187 188 189 /** 190 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName. 191 * A Cluster will be created from the conf parameter only when it's needed. 192 * 193 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 194 * that any necessary internal modifications do not reflect on the incoming 195 * parameter. 196 * 197 * @param conf the configuration 198 * @return the {@link Job} , with no connection to a cluster yet. 199 * @throws IOException 200 */ 201 public static Job getInstance(Configuration conf, String jobName) 202 throws IOException { 203 // create with a null Cluster 204 Job result = getInstance(conf); 205 result.setJobName(jobName); 206 return result; 207 } 208 209 /** 210 * Creates a new {@link Job} with no particular {@link Cluster} and given 211 * {@link Configuration} and {@link JobStatus}. 212 * A Cluster will be created from the conf parameter only when it's needed. 213 * 214 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 215 * that any necessary internal modifications do not reflect on the incoming 216 * parameter. 217 * 218 * @param status job status 219 * @param conf job configuration 220 * @return the {@link Job} , with no connection to a cluster yet. 221 * @throws IOException 222 */ 223 public static Job getInstance(JobStatus status, Configuration conf) 224 throws IOException { 225 return new Job(status, new JobConf(conf)); 226 } 227 228 /** 229 * Creates a new {@link Job} with no particular {@link Cluster}. 230 * A Cluster will be created from the conf parameter only when it's needed. 231 * 232 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 233 * that any necessary internal modifications do not reflect on the incoming 234 * parameter. 235 * 236 * @param ignored 237 * @return the {@link Job} , with no connection to a cluster yet. 238 * @throws IOException 239 * @deprecated Use {@link #getInstance()} 240 */ 241 @Deprecated 242 public static Job getInstance(Cluster ignored) throws IOException { 243 return getInstance(); 244 } 245 246 /** 247 * Creates a new {@link Job} with no particular {@link Cluster} and given 248 * {@link Configuration}. 249 * A Cluster will be created from the conf parameter only when it's needed. 250 * 251 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 252 * that any necessary internal modifications do not reflect on the incoming 253 * parameter. 254 * 255 * @param ignored 256 * @param conf job configuration 257 * @return the {@link Job} , with no connection to a cluster yet. 258 * @throws IOException 259 * @deprecated Use {@link #getInstance(Configuration)} 260 */ 261 @Deprecated 262 public static Job getInstance(Cluster ignored, Configuration conf) 263 throws IOException { 264 return getInstance(conf); 265 } 266 267 /** 268 * Creates a new {@link Job} with no particular {@link Cluster} and given 269 * {@link Configuration} and {@link JobStatus}. 270 * A Cluster will be created from the conf parameter only when it's needed. 271 * 272 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 273 * that any necessary internal modifications do not reflect on the incoming 274 * parameter. 275 * 276 * @param cluster cluster 277 * @param status job status 278 * @param conf job configuration 279 * @return the {@link Job} , with no connection to a cluster yet. 280 * @throws IOException 281 */ 282 @Private 283 public static Job getInstance(Cluster cluster, JobStatus status, 284 Configuration conf) throws IOException { 285 Job job = getInstance(status, conf); 286 job.setCluster(cluster); 287 return job; 288 } 289 290 private void ensureState(JobState state) throws IllegalStateException { 291 if (state != this.state) { 292 throw new IllegalStateException("Job in state "+ this.state + 293 " instead of " + state); 294 } 295 296 if (state == JobState.RUNNING && cluster == null) { 297 throw new IllegalStateException 298 ("Job in state " + this.state 299 + ", but it isn't attached to any job tracker!"); 300 } 301 } 302 303 /** 304 * Some methods rely on having a recent job status object. Refresh 305 * it, if necessary 306 */ 307 synchronized void ensureFreshStatus() 308 throws IOException { 309 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) { 310 updateStatus(); 311 } 312 } 313 314 /** Some methods need to update status immediately. So, refresh 315 * immediately 316 * @throws IOException 317 */ 318 synchronized void updateStatus() throws IOException { 319 try { 320 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 321 @Override 322 public JobStatus run() throws IOException, InterruptedException { 323 return cluster.getClient().getJobStatus(status.getJobID()); 324 } 325 }); 326 } 327 catch (InterruptedException ie) { 328 throw new IOException(ie); 329 } 330 if (this.status == null) { 331 throw new IOException("Job status not available "); 332 } 333 this.statustime = System.currentTimeMillis(); 334 } 335 336 public JobStatus getStatus() throws IOException, InterruptedException { 337 ensureState(JobState.RUNNING); 338 updateStatus(); 339 return status; 340 } 341 342 /** 343 * Returns the current state of the Job. 344 * 345 * @return JobStatus#State 346 * @throws IOException 347 * @throws InterruptedException 348 */ 349 public JobStatus.State getJobState() 350 throws IOException, InterruptedException { 351 ensureState(JobState.RUNNING); 352 updateStatus(); 353 return status.getState(); 354 } 355 356 /** 357 * Get the URL where some job progress information will be displayed. 358 * 359 * @return the URL where some job progress information will be displayed. 360 */ 361 public String getTrackingURL(){ 362 ensureState(JobState.RUNNING); 363 return status.getTrackingUrl().toString(); 364 } 365 366 /** 367 * Get the path of the submitted job configuration. 368 * 369 * @return the path of the submitted job configuration. 370 */ 371 public String getJobFile() { 372 ensureState(JobState.RUNNING); 373 return status.getJobFile(); 374 } 375 376 /** 377 * Get start time of the job. 378 * 379 * @return the start time of the job 380 */ 381 public long getStartTime() { 382 ensureState(JobState.RUNNING); 383 return status.getStartTime(); 384 } 385 386 /** 387 * Get finish time of the job. 388 * 389 * @return the finish time of the job 390 */ 391 public long getFinishTime() throws IOException, InterruptedException { 392 ensureState(JobState.RUNNING); 393 updateStatus(); 394 return status.getFinishTime(); 395 } 396 397 /** 398 * Get scheduling info of the job. 399 * 400 * @return the scheduling info of the job 401 */ 402 public String getSchedulingInfo() { 403 ensureState(JobState.RUNNING); 404 return status.getSchedulingInfo(); 405 } 406 407 /** 408 * Get scheduling info of the job. 409 * 410 * @return the priority info of the job 411 */ 412 public JobPriority getPriority() throws IOException, InterruptedException { 413 ensureState(JobState.RUNNING); 414 updateStatus(); 415 return status.getPriority(); 416 } 417 418 /** 419 * The user-specified job name. 420 */ 421 public String getJobName() { 422 if (state == JobState.DEFINE) { 423 return super.getJobName(); 424 } 425 ensureState(JobState.RUNNING); 426 return status.getJobName(); 427 } 428 429 public String getHistoryUrl() throws IOException, InterruptedException { 430 ensureState(JobState.RUNNING); 431 updateStatus(); 432 return status.getHistoryFile(); 433 } 434 435 public boolean isRetired() throws IOException, InterruptedException { 436 ensureState(JobState.RUNNING); 437 updateStatus(); 438 return status.isRetired(); 439 } 440 441 @Private 442 public Cluster getCluster() { 443 return cluster; 444 } 445 446 /** Only for mocks in unit tests. */ 447 @Private 448 private void setCluster(Cluster cluster) { 449 this.cluster = cluster; 450 } 451 452 /** 453 * Dump stats to screen. 454 */ 455 @Override 456 public String toString() { 457 ensureState(JobState.RUNNING); 458 String reasonforFailure = " "; 459 int numMaps = 0; 460 int numReduces = 0; 461 try { 462 updateStatus(); 463 if (status.getState().equals(JobStatus.State.FAILED)) 464 reasonforFailure = getTaskFailureEventString(); 465 numMaps = getTaskReports(TaskType.MAP).length; 466 numReduces = getTaskReports(TaskType.REDUCE).length; 467 } catch (IOException e) { 468 } catch (InterruptedException ie) { 469 } 470 StringBuffer sb = new StringBuffer(); 471 sb.append("Job: ").append(status.getJobID()).append("\n"); 472 sb.append("Job File: ").append(status.getJobFile()).append("\n"); 473 sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); 474 sb.append("\n"); 475 sb.append("Uber job : ").append(status.isUber()).append("\n"); 476 sb.append("Number of maps: ").append(numMaps).append("\n"); 477 sb.append("Number of reduces: ").append(numReduces).append("\n"); 478 sb.append("map() completion: "); 479 sb.append(status.getMapProgress()).append("\n"); 480 sb.append("reduce() completion: "); 481 sb.append(status.getReduceProgress()).append("\n"); 482 sb.append("Job state: "); 483 sb.append(status.getState()).append("\n"); 484 sb.append("retired: ").append(status.isRetired()).append("\n"); 485 sb.append("reason for failure: ").append(reasonforFailure); 486 return sb.toString(); 487 } 488 489 /** 490 * @return taskid which caused job failure 491 * @throws IOException 492 * @throws InterruptedException 493 */ 494 String getTaskFailureEventString() throws IOException, 495 InterruptedException { 496 int failCount = 1; 497 TaskCompletionEvent lastEvent = null; 498 TaskCompletionEvent[] events = ugi.doAs(new 499 PrivilegedExceptionAction<TaskCompletionEvent[]>() { 500 @Override 501 public TaskCompletionEvent[] run() throws IOException, 502 InterruptedException { 503 return cluster.getClient().getTaskCompletionEvents( 504 status.getJobID(), 0, 10); 505 } 506 }); 507 for (TaskCompletionEvent event : events) { 508 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) { 509 failCount++; 510 lastEvent = event; 511 } 512 } 513 if (lastEvent == null) { 514 return "There are no failed tasks for the job. " 515 + "Job is failed due to some other reason and reason " 516 + "can be found in the logs."; 517 } 518 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2); 519 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2); 520 return (" task " + taskID + " failed " + 521 failCount + " times " + "For details check tasktracker at: " + 522 lastEvent.getTaskTrackerHttp()); 523 } 524 525 /** 526 * Get the information of the current state of the tasks of a job. 527 * 528 * @param type Type of the task 529 * @return the list of all of the map tips. 530 * @throws IOException 531 */ 532 public TaskReport[] getTaskReports(TaskType type) 533 throws IOException, InterruptedException { 534 ensureState(JobState.RUNNING); 535 final TaskType tmpType = type; 536 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() { 537 public TaskReport[] run() throws IOException, InterruptedException { 538 return cluster.getClient().getTaskReports(getJobID(), tmpType); 539 } 540 }); 541 } 542 543 /** 544 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 545 * and 1.0. When all map tasks have completed, the function returns 1.0. 546 * 547 * @return the progress of the job's map-tasks. 548 * @throws IOException 549 */ 550 public float mapProgress() throws IOException { 551 ensureState(JobState.RUNNING); 552 ensureFreshStatus(); 553 return status.getMapProgress(); 554 } 555 556 /** 557 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 558 * and 1.0. When all reduce tasks have completed, the function returns 1.0. 559 * 560 * @return the progress of the job's reduce-tasks. 561 * @throws IOException 562 */ 563 public float reduceProgress() throws IOException { 564 ensureState(JobState.RUNNING); 565 ensureFreshStatus(); 566 return status.getReduceProgress(); 567 } 568 569 /** 570 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 571 * and 1.0. When all cleanup tasks have completed, the function returns 1.0. 572 * 573 * @return the progress of the job's cleanup-tasks. 574 * @throws IOException 575 */ 576 public float cleanupProgress() throws IOException, InterruptedException { 577 ensureState(JobState.RUNNING); 578 ensureFreshStatus(); 579 return status.getCleanupProgress(); 580 } 581 582 /** 583 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 584 * and 1.0. When all setup tasks have completed, the function returns 1.0. 585 * 586 * @return the progress of the job's setup-tasks. 587 * @throws IOException 588 */ 589 public float setupProgress() throws IOException { 590 ensureState(JobState.RUNNING); 591 ensureFreshStatus(); 592 return status.getSetupProgress(); 593 } 594 595 /** 596 * Check if the job is finished or not. 597 * This is a non-blocking call. 598 * 599 * @return <code>true</code> if the job is complete, else <code>false</code>. 600 * @throws IOException 601 */ 602 public boolean isComplete() throws IOException { 603 ensureState(JobState.RUNNING); 604 updateStatus(); 605 return status.isJobComplete(); 606 } 607 608 /** 609 * Check if the job completed successfully. 610 * 611 * @return <code>true</code> if the job succeeded, else <code>false</code>. 612 * @throws IOException 613 */ 614 public boolean isSuccessful() throws IOException { 615 ensureState(JobState.RUNNING); 616 updateStatus(); 617 return status.getState() == JobStatus.State.SUCCEEDED; 618 } 619 620 /** 621 * Kill the running job. Blocks until all job tasks have been 622 * killed as well. If the job is no longer running, it simply returns. 623 * 624 * @throws IOException 625 */ 626 public void killJob() throws IOException { 627 ensureState(JobState.RUNNING); 628 try { 629 cluster.getClient().killJob(getJobID()); 630 } 631 catch (InterruptedException ie) { 632 throw new IOException(ie); 633 } 634 } 635 636 /** 637 * Set the priority of a running job. 638 * @param jobPriority the new priority for the job. 639 * @throws IOException 640 */ 641 public void setPriority(JobPriority jobPriority) throws IOException, 642 InterruptedException { 643 if (state == JobState.DEFINE) { 644 if (jobPriority == JobPriority.UNDEFINED_PRIORITY) { 645 conf.setJobPriorityAsInteger(convertPriorityToInteger(jobPriority)); 646 } else { 647 conf.setJobPriority(org.apache.hadoop.mapred.JobPriority 648 .valueOf(jobPriority.name())); 649 } 650 } else { 651 ensureState(JobState.RUNNING); 652 final int tmpPriority = convertPriorityToInteger(jobPriority); 653 ugi.doAs(new PrivilegedExceptionAction<Object>() { 654 @Override 655 public Object run() throws IOException, InterruptedException { 656 cluster.getClient() 657 .setJobPriority(getJobID(), Integer.toString(tmpPriority)); 658 return null; 659 } 660 }); 661 } 662 } 663 664 /** 665 * Set the priority of a running job. 666 * 667 * @param jobPriority 668 * the new priority for the job. 669 * @throws IOException 670 */ 671 public void setPriorityAsInteger(int jobPriority) throws IOException, 672 InterruptedException { 673 if (state == JobState.DEFINE) { 674 conf.setJobPriorityAsInteger(jobPriority); 675 } else { 676 ensureState(JobState.RUNNING); 677 final int tmpPriority = jobPriority; 678 ugi.doAs(new PrivilegedExceptionAction<Object>() { 679 @Override 680 public Object run() throws IOException, InterruptedException { 681 cluster.getClient() 682 .setJobPriority(getJobID(), Integer.toString(tmpPriority)); 683 return null; 684 } 685 }); 686 } 687 } 688 689 private int convertPriorityToInteger(JobPriority jobPriority) { 690 switch (jobPriority) { 691 case VERY_HIGH : 692 return 5; 693 case HIGH : 694 return 4; 695 case NORMAL : 696 return 3; 697 case LOW : 698 return 2; 699 case VERY_LOW : 700 return 1; 701 case DEFAULT : 702 return 0; 703 default: 704 break; 705 } 706 // For UNDEFINED_PRIORITY, we can set it to default for better handling 707 return 0; 708 } 709 710 /** 711 * Get events indicating completion (success/failure) of component tasks. 712 * 713 * @param startFrom index to start fetching events from 714 * @param numEvents number of events to fetch 715 * @return an array of {@link TaskCompletionEvent}s 716 * @throws IOException 717 */ 718 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom, 719 final int numEvents) throws IOException, InterruptedException { 720 ensureState(JobState.RUNNING); 721 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() { 722 @Override 723 public TaskCompletionEvent[] run() throws IOException, InterruptedException { 724 return cluster.getClient().getTaskCompletionEvents(getJobID(), 725 startFrom, numEvents); 726 } 727 }); 728 } 729 730 /** 731 * Get events indicating completion (success/failure) of component tasks. 732 * 733 * @param startFrom index to start fetching events from 734 * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s 735 * @throws IOException 736 */ 737 public org.apache.hadoop.mapred.TaskCompletionEvent[] 738 getTaskCompletionEvents(final int startFrom) throws IOException { 739 try { 740 TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10); 741 org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents = 742 new org.apache.hadoop.mapred.TaskCompletionEvent[events.length]; 743 for (int i = 0; i < events.length; i++) { 744 retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade 745 (events[i]); 746 } 747 return retEvents; 748 } catch (InterruptedException ie) { 749 throw new IOException(ie); 750 } 751 } 752 753 /** 754 * Kill indicated task attempt. 755 * @param taskId the id of the task to kill. 756 * @param shouldFail if <code>true</code> the task is failed and added 757 * to failed tasks list, otherwise it is just killed, 758 * w/o affecting job failure status. 759 */ 760 @Private 761 public boolean killTask(final TaskAttemptID taskId, 762 final boolean shouldFail) throws IOException { 763 ensureState(JobState.RUNNING); 764 try { 765 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { 766 public Boolean run() throws IOException, InterruptedException { 767 return cluster.getClient().killTask(taskId, shouldFail); 768 } 769 }); 770 } 771 catch (InterruptedException ie) { 772 throw new IOException(ie); 773 } 774 } 775 776 /** 777 * Kill indicated task attempt. 778 * 779 * @param taskId the id of the task to be terminated. 780 * @throws IOException 781 */ 782 public void killTask(final TaskAttemptID taskId) 783 throws IOException { 784 killTask(taskId, false); 785 } 786 787 /** 788 * Fail indicated task attempt. 789 * 790 * @param taskId the id of the task to be terminated. 791 * @throws IOException 792 */ 793 public void failTask(final TaskAttemptID taskId) 794 throws IOException { 795 killTask(taskId, true); 796 } 797 798 /** 799 * Gets the counters for this job. May return null if the job has been 800 * retired and the job is no longer in the completed job store. 801 * 802 * @return the counters for this job. 803 * @throws IOException 804 */ 805 public Counters getCounters() 806 throws IOException { 807 ensureState(JobState.RUNNING); 808 try { 809 return ugi.doAs(new PrivilegedExceptionAction<Counters>() { 810 @Override 811 public Counters run() throws IOException, InterruptedException { 812 return cluster.getClient().getJobCounters(getJobID()); 813 } 814 }); 815 } 816 catch (InterruptedException ie) { 817 throw new IOException(ie); 818 } 819 } 820 821 /** 822 * Gets the diagnostic messages for a given task attempt. 823 * @param taskid 824 * @return the list of diagnostic messages for the task 825 * @throws IOException 826 */ 827 public String[] getTaskDiagnostics(final TaskAttemptID taskid) 828 throws IOException, InterruptedException { 829 ensureState(JobState.RUNNING); 830 return ugi.doAs(new PrivilegedExceptionAction<String[]>() { 831 @Override 832 public String[] run() throws IOException, InterruptedException { 833 return cluster.getClient().getTaskDiagnostics(taskid); 834 } 835 }); 836 } 837 838 /** 839 * Set the number of reduce tasks for the job. 840 * @param tasks the number of reduce tasks 841 * @throws IllegalStateException if the job is submitted 842 */ 843 public void setNumReduceTasks(int tasks) throws IllegalStateException { 844 ensureState(JobState.DEFINE); 845 conf.setNumReduceTasks(tasks); 846 } 847 848 /** 849 * Set the current working directory for the default file system. 850 * 851 * @param dir the new current working directory. 852 * @throws IllegalStateException if the job is submitted 853 */ 854 public void setWorkingDirectory(Path dir) throws IOException { 855 ensureState(JobState.DEFINE); 856 conf.setWorkingDirectory(dir); 857 } 858 859 /** 860 * Set the {@link InputFormat} for the job. 861 * @param cls the <code>InputFormat</code> to use 862 * @throws IllegalStateException if the job is submitted 863 */ 864 public void setInputFormatClass(Class<? extends InputFormat> cls 865 ) throws IllegalStateException { 866 ensureState(JobState.DEFINE); 867 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 868 InputFormat.class); 869 } 870 871 /** 872 * Set the {@link OutputFormat} for the job. 873 * @param cls the <code>OutputFormat</code> to use 874 * @throws IllegalStateException if the job is submitted 875 */ 876 public void setOutputFormatClass(Class<? extends OutputFormat> cls 877 ) throws IllegalStateException { 878 ensureState(JobState.DEFINE); 879 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 880 OutputFormat.class); 881 } 882 883 /** 884 * Set the {@link Mapper} for the job. 885 * @param cls the <code>Mapper</code> to use 886 * @throws IllegalStateException if the job is submitted 887 */ 888 public void setMapperClass(Class<? extends Mapper> cls 889 ) throws IllegalStateException { 890 ensureState(JobState.DEFINE); 891 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class); 892 } 893 894 /** 895 * Set the Jar by finding where a given class came from. 896 * @param cls the example class 897 */ 898 public void setJarByClass(Class<?> cls) { 899 ensureState(JobState.DEFINE); 900 conf.setJarByClass(cls); 901 } 902 903 /** 904 * Set the job jar 905 */ 906 public void setJar(String jar) { 907 ensureState(JobState.DEFINE); 908 conf.setJar(jar); 909 } 910 911 /** 912 * Set the reported username for this job. 913 * 914 * @param user the username for this job. 915 */ 916 public void setUser(String user) { 917 ensureState(JobState.DEFINE); 918 conf.setUser(user); 919 } 920 921 /** 922 * Set the combiner class for the job. 923 * @param cls the combiner to use 924 * @throws IllegalStateException if the job is submitted 925 */ 926 public void setCombinerClass(Class<? extends Reducer> cls 927 ) throws IllegalStateException { 928 ensureState(JobState.DEFINE); 929 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); 930 } 931 932 /** 933 * Set the {@link Reducer} for the job. 934 * @param cls the <code>Reducer</code> to use 935 * @throws IllegalStateException if the job is submitted 936 */ 937 public void setReducerClass(Class<? extends Reducer> cls 938 ) throws IllegalStateException { 939 ensureState(JobState.DEFINE); 940 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class); 941 } 942 943 /** 944 * Set the {@link Partitioner} for the job. 945 * @param cls the <code>Partitioner</code> to use 946 * @throws IllegalStateException if the job is submitted 947 */ 948 public void setPartitionerClass(Class<? extends Partitioner> cls 949 ) throws IllegalStateException { 950 ensureState(JobState.DEFINE); 951 conf.setClass(PARTITIONER_CLASS_ATTR, cls, 952 Partitioner.class); 953 } 954 955 /** 956 * Set the key class for the map output data. This allows the user to 957 * specify the map output key class to be different than the final output 958 * value class. 959 * 960 * @param theClass the map output key class. 961 * @throws IllegalStateException if the job is submitted 962 */ 963 public void setMapOutputKeyClass(Class<?> theClass 964 ) throws IllegalStateException { 965 ensureState(JobState.DEFINE); 966 conf.setMapOutputKeyClass(theClass); 967 } 968 969 /** 970 * Set the value class for the map output data. This allows the user to 971 * specify the map output value class to be different than the final output 972 * value class. 973 * 974 * @param theClass the map output value class. 975 * @throws IllegalStateException if the job is submitted 976 */ 977 public void setMapOutputValueClass(Class<?> theClass 978 ) throws IllegalStateException { 979 ensureState(JobState.DEFINE); 980 conf.setMapOutputValueClass(theClass); 981 } 982 983 /** 984 * Set the key class for the job output data. 985 * 986 * @param theClass the key class for the job output data. 987 * @throws IllegalStateException if the job is submitted 988 */ 989 public void setOutputKeyClass(Class<?> theClass 990 ) throws IllegalStateException { 991 ensureState(JobState.DEFINE); 992 conf.setOutputKeyClass(theClass); 993 } 994 995 /** 996 * Set the value class for job outputs. 997 * 998 * @param theClass the value class for job outputs. 999 * @throws IllegalStateException if the job is submitted 1000 */ 1001 public void setOutputValueClass(Class<?> theClass 1002 ) throws IllegalStateException { 1003 ensureState(JobState.DEFINE); 1004 conf.setOutputValueClass(theClass); 1005 } 1006 1007 /** 1008 * Define the comparator that controls which keys are grouped together 1009 * for a single call to combiner, 1010 * {@link Reducer#reduce(Object, Iterable, 1011 * org.apache.hadoop.mapreduce.Reducer.Context)} 1012 * 1013 * @param cls the raw comparator to use 1014 * @throws IllegalStateException if the job is submitted 1015 */ 1016 public void setCombinerKeyGroupingComparatorClass( 1017 Class<? extends RawComparator> cls) throws IllegalStateException { 1018 ensureState(JobState.DEFINE); 1019 conf.setCombinerKeyGroupingComparator(cls); 1020 } 1021 1022 /** 1023 * Define the comparator that controls how the keys are sorted before they 1024 * are passed to the {@link Reducer}. 1025 * @param cls the raw comparator 1026 * @throws IllegalStateException if the job is submitted 1027 * @see #setCombinerKeyGroupingComparatorClass(Class) 1028 */ 1029 public void setSortComparatorClass(Class<? extends RawComparator> cls 1030 ) throws IllegalStateException { 1031 ensureState(JobState.DEFINE); 1032 conf.setOutputKeyComparatorClass(cls); 1033 } 1034 1035 /** 1036 * Define the comparator that controls which keys are grouped together 1037 * for a single call to 1038 * {@link Reducer#reduce(Object, Iterable, 1039 * org.apache.hadoop.mapreduce.Reducer.Context)} 1040 * @param cls the raw comparator to use 1041 * @throws IllegalStateException if the job is submitted 1042 * @see #setCombinerKeyGroupingComparatorClass(Class) 1043 */ 1044 public void setGroupingComparatorClass(Class<? extends RawComparator> cls 1045 ) throws IllegalStateException { 1046 ensureState(JobState.DEFINE); 1047 conf.setOutputValueGroupingComparator(cls); 1048 } 1049 1050 /** 1051 * Set the user-specified job name. 1052 * 1053 * @param name the job's new name. 1054 * @throws IllegalStateException if the job is submitted 1055 */ 1056 public void setJobName(String name) throws IllegalStateException { 1057 ensureState(JobState.DEFINE); 1058 conf.setJobName(name); 1059 } 1060 1061 /** 1062 * Turn speculative execution on or off for this job. 1063 * 1064 * @param speculativeExecution <code>true</code> if speculative execution 1065 * should be turned on, else <code>false</code>. 1066 */ 1067 public void setSpeculativeExecution(boolean speculativeExecution) { 1068 ensureState(JobState.DEFINE); 1069 conf.setSpeculativeExecution(speculativeExecution); 1070 } 1071 1072 /** 1073 * Turn speculative execution on or off for this job for map tasks. 1074 * 1075 * @param speculativeExecution <code>true</code> if speculative execution 1076 * should be turned on for map tasks, 1077 * else <code>false</code>. 1078 */ 1079 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1080 ensureState(JobState.DEFINE); 1081 conf.setMapSpeculativeExecution(speculativeExecution); 1082 } 1083 1084 /** 1085 * Turn speculative execution on or off for this job for reduce tasks. 1086 * 1087 * @param speculativeExecution <code>true</code> if speculative execution 1088 * should be turned on for reduce tasks, 1089 * else <code>false</code>. 1090 */ 1091 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1092 ensureState(JobState.DEFINE); 1093 conf.setReduceSpeculativeExecution(speculativeExecution); 1094 } 1095 1096 /** 1097 * Specify whether job-setup and job-cleanup is needed for the job 1098 * 1099 * @param needed If <code>true</code>, job-setup and job-cleanup will be 1100 * considered from {@link OutputCommitter} 1101 * else ignored. 1102 */ 1103 public void setJobSetupCleanupNeeded(boolean needed) { 1104 ensureState(JobState.DEFINE); 1105 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed); 1106 } 1107 1108 /** 1109 * Set the given set of archives 1110 * @param archives The list of archives that need to be localized 1111 */ 1112 public void setCacheArchives(URI[] archives) { 1113 ensureState(JobState.DEFINE); 1114 DistributedCache.setCacheArchives(archives, conf); 1115 } 1116 1117 /** 1118 * Set the given set of files 1119 * @param files The list of files that need to be localized 1120 */ 1121 public void setCacheFiles(URI[] files) { 1122 ensureState(JobState.DEFINE); 1123 DistributedCache.setCacheFiles(files, conf); 1124 } 1125 1126 /** 1127 * Add a archives to be localized 1128 * @param uri The uri of the cache to be localized 1129 */ 1130 public void addCacheArchive(URI uri) { 1131 ensureState(JobState.DEFINE); 1132 DistributedCache.addCacheArchive(uri, conf); 1133 } 1134 1135 /** 1136 * Add a file to be localized 1137 * @param uri The uri of the cache to be localized 1138 */ 1139 public void addCacheFile(URI uri) { 1140 ensureState(JobState.DEFINE); 1141 DistributedCache.addCacheFile(uri, conf); 1142 } 1143 1144 /** 1145 * Add an file path to the current set of classpath entries It adds the file 1146 * to cache as well. 1147 * 1148 * Files added with this method will not be unpacked while being added to the 1149 * classpath. 1150 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)} 1151 * method instead. 1152 * 1153 * @param file Path of the file to be added 1154 */ 1155 public void addFileToClassPath(Path file) 1156 throws IOException { 1157 ensureState(JobState.DEFINE); 1158 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf)); 1159 } 1160 1161 /** 1162 * Add an archive path to the current set of classpath entries. It adds the 1163 * archive to cache as well. 1164 * 1165 * Archive files will be unpacked and added to the classpath 1166 * when being distributed. 1167 * 1168 * @param archive Path of the archive to be added 1169 */ 1170 public void addArchiveToClassPath(Path archive) 1171 throws IOException { 1172 ensureState(JobState.DEFINE); 1173 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); 1174 } 1175 1176 /** 1177 * Originally intended to enable symlinks, but currently symlinks cannot be 1178 * disabled. 1179 */ 1180 @Deprecated 1181 public void createSymlink() { 1182 ensureState(JobState.DEFINE); 1183 DistributedCache.createSymlink(conf); 1184 } 1185 1186 /** 1187 * Expert: Set the number of maximum attempts that will be made to run a 1188 * map task. 1189 * 1190 * @param n the number of attempts per map task. 1191 */ 1192 public void setMaxMapAttempts(int n) { 1193 ensureState(JobState.DEFINE); 1194 conf.setMaxMapAttempts(n); 1195 } 1196 1197 /** 1198 * Expert: Set the number of maximum attempts that will be made to run a 1199 * reduce task. 1200 * 1201 * @param n the number of attempts per reduce task. 1202 */ 1203 public void setMaxReduceAttempts(int n) { 1204 ensureState(JobState.DEFINE); 1205 conf.setMaxReduceAttempts(n); 1206 } 1207 1208 /** 1209 * Set whether the system should collect profiler information for some of 1210 * the tasks in this job? The information is stored in the user log 1211 * directory. 1212 * @param newValue true means it should be gathered 1213 */ 1214 public void setProfileEnabled(boolean newValue) { 1215 ensureState(JobState.DEFINE); 1216 conf.setProfileEnabled(newValue); 1217 } 1218 1219 /** 1220 * Set the profiler configuration arguments. If the string contains a '%s' it 1221 * will be replaced with the name of the profiling output file when the task 1222 * runs. 1223 * 1224 * This value is passed to the task child JVM on the command line. 1225 * 1226 * @param value the configuration string 1227 */ 1228 public void setProfileParams(String value) { 1229 ensureState(JobState.DEFINE); 1230 conf.setProfileParams(value); 1231 } 1232 1233 /** 1234 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1235 * must also be called. 1236 * @param newValue a set of integer ranges of the map ids 1237 */ 1238 public void setProfileTaskRange(boolean isMap, String newValue) { 1239 ensureState(JobState.DEFINE); 1240 conf.setProfileTaskRange(isMap, newValue); 1241 } 1242 1243 private void ensureNotSet(String attr, String msg) throws IOException { 1244 if (conf.get(attr) != null) { 1245 throw new IOException(attr + " is incompatible with " + msg + " mode."); 1246 } 1247 } 1248 1249 /** 1250 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation 1251 * tokens upon job completion. Defaults to true. 1252 */ 1253 public void setCancelDelegationTokenUponJobCompletion(boolean value) { 1254 ensureState(JobState.DEFINE); 1255 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value); 1256 } 1257 1258 /** 1259 * Default to the new APIs unless they are explicitly set or the old mapper or 1260 * reduce attributes are used. 1261 * @throws IOException if the configuration is inconsistent 1262 */ 1263 private void setUseNewAPI() throws IOException { 1264 int numReduces = conf.getNumReduceTasks(); 1265 String oldMapperClass = "mapred.mapper.class"; 1266 String oldReduceClass = "mapred.reducer.class"; 1267 conf.setBooleanIfUnset("mapred.mapper.new-api", 1268 conf.get(oldMapperClass) == null); 1269 if (conf.getUseNewMapper()) { 1270 String mode = "new map API"; 1271 ensureNotSet("mapred.input.format.class", mode); 1272 ensureNotSet(oldMapperClass, mode); 1273 if (numReduces != 0) { 1274 ensureNotSet("mapred.partitioner.class", mode); 1275 } else { 1276 ensureNotSet("mapred.output.format.class", mode); 1277 } 1278 } else { 1279 String mode = "map compatibility"; 1280 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode); 1281 ensureNotSet(MAP_CLASS_ATTR, mode); 1282 if (numReduces != 0) { 1283 ensureNotSet(PARTITIONER_CLASS_ATTR, mode); 1284 } else { 1285 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1286 } 1287 } 1288 if (numReduces != 0) { 1289 conf.setBooleanIfUnset("mapred.reducer.new-api", 1290 conf.get(oldReduceClass) == null); 1291 if (conf.getUseNewReducer()) { 1292 String mode = "new reduce API"; 1293 ensureNotSet("mapred.output.format.class", mode); 1294 ensureNotSet(oldReduceClass, mode); 1295 } else { 1296 String mode = "reduce compatibility"; 1297 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1298 ensureNotSet(REDUCE_CLASS_ATTR, mode); 1299 } 1300 } 1301 } 1302 1303 private synchronized void connect() 1304 throws IOException, InterruptedException, ClassNotFoundException { 1305 if (cluster == null) { 1306 cluster = 1307 ugi.doAs(new PrivilegedExceptionAction<Cluster>() { 1308 public Cluster run() 1309 throws IOException, InterruptedException, 1310 ClassNotFoundException { 1311 return new Cluster(getConfiguration()); 1312 } 1313 }); 1314 } 1315 } 1316 1317 boolean isConnected() { 1318 return cluster != null; 1319 } 1320 1321 /** Only for mocking via unit tests. */ 1322 @Private 1323 public JobSubmitter getJobSubmitter(FileSystem fs, 1324 ClientProtocol submitClient) throws IOException { 1325 return new JobSubmitter(fs, submitClient); 1326 } 1327 /** 1328 * Submit the job to the cluster and return immediately. 1329 * @throws IOException 1330 */ 1331 public void submit() 1332 throws IOException, InterruptedException, ClassNotFoundException { 1333 ensureState(JobState.DEFINE); 1334 setUseNewAPI(); 1335 connect(); 1336 final JobSubmitter submitter = 1337 getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); 1338 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 1339 public JobStatus run() throws IOException, InterruptedException, 1340 ClassNotFoundException { 1341 return submitter.submitJobInternal(Job.this, cluster); 1342 } 1343 }); 1344 state = JobState.RUNNING; 1345 LOG.info("The url to track the job: " + getTrackingURL()); 1346 } 1347 1348 /** 1349 * Submit the job to the cluster and wait for it to finish. 1350 * @param verbose print the progress to the user 1351 * @return true if the job succeeded 1352 * @throws IOException thrown if the communication with the 1353 * <code>JobTracker</code> is lost 1354 */ 1355 public boolean waitForCompletion(boolean verbose 1356 ) throws IOException, InterruptedException, 1357 ClassNotFoundException { 1358 if (state == JobState.DEFINE) { 1359 submit(); 1360 } 1361 if (verbose) { 1362 monitorAndPrintJob(); 1363 } else { 1364 // get the completion poll interval from the client. 1365 int completionPollIntervalMillis = 1366 Job.getCompletionPollInterval(cluster.getConf()); 1367 while (!isComplete()) { 1368 try { 1369 Thread.sleep(completionPollIntervalMillis); 1370 } catch (InterruptedException ie) { 1371 } 1372 } 1373 } 1374 return isSuccessful(); 1375 } 1376 1377 /** 1378 * Monitor a job and print status in real-time as progress is made and tasks 1379 * fail. 1380 * @return true if the job succeeded 1381 * @throws IOException if communication to the JobTracker fails 1382 */ 1383 public boolean monitorAndPrintJob() 1384 throws IOException, InterruptedException { 1385 String lastReport = null; 1386 Job.TaskStatusFilter filter; 1387 Configuration clientConf = getConfiguration(); 1388 filter = Job.getTaskOutputFilter(clientConf); 1389 JobID jobId = getJobID(); 1390 LOG.info("Running job: " + jobId); 1391 int eventCounter = 0; 1392 boolean profiling = getProfileEnabled(); 1393 IntegerRanges mapRanges = getProfileTaskRange(true); 1394 IntegerRanges reduceRanges = getProfileTaskRange(false); 1395 int progMonitorPollIntervalMillis = 1396 Job.getProgressPollInterval(clientConf); 1397 /* make sure to report full progress after the job is done */ 1398 boolean reportedAfterCompletion = false; 1399 boolean reportedUberMode = false; 1400 while (!isComplete() || !reportedAfterCompletion) { 1401 if (isComplete()) { 1402 reportedAfterCompletion = true; 1403 } else { 1404 Thread.sleep(progMonitorPollIntervalMillis); 1405 } 1406 if (status.getState() == JobStatus.State.PREP) { 1407 continue; 1408 } 1409 if (!reportedUberMode) { 1410 reportedUberMode = true; 1411 LOG.info("Job " + jobId + " running in uber mode : " + isUber()); 1412 } 1413 String report = 1414 (" map " + StringUtils.formatPercent(mapProgress(), 0)+ 1415 " reduce " + 1416 StringUtils.formatPercent(reduceProgress(), 0)); 1417 if (!report.equals(lastReport)) { 1418 LOG.info(report); 1419 lastReport = report; 1420 } 1421 1422 TaskCompletionEvent[] events = 1423 getTaskCompletionEvents(eventCounter, 10); 1424 eventCounter += events.length; 1425 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges); 1426 } 1427 boolean success = isSuccessful(); 1428 if (success) { 1429 LOG.info("Job " + jobId + " completed successfully"); 1430 } else { 1431 LOG.info("Job " + jobId + " failed with state " + status.getState() + 1432 " due to: " + status.getFailureInfo()); 1433 } 1434 Counters counters = getCounters(); 1435 if (counters != null) { 1436 LOG.info(counters.toString()); 1437 } 1438 return success; 1439 } 1440 1441 private void printTaskEvents(TaskCompletionEvent[] events, 1442 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges, 1443 IntegerRanges reduceRanges) throws IOException, InterruptedException { 1444 for (TaskCompletionEvent event : events) { 1445 switch (filter) { 1446 case NONE: 1447 break; 1448 case SUCCEEDED: 1449 if (event.getStatus() == 1450 TaskCompletionEvent.Status.SUCCEEDED) { 1451 LOG.info(event.toString()); 1452 } 1453 break; 1454 case FAILED: 1455 if (event.getStatus() == 1456 TaskCompletionEvent.Status.FAILED) { 1457 LOG.info(event.toString()); 1458 // Displaying the task diagnostic information 1459 TaskAttemptID taskId = event.getTaskAttemptId(); 1460 String[] taskDiagnostics = getTaskDiagnostics(taskId); 1461 if (taskDiagnostics != null) { 1462 for (String diagnostics : taskDiagnostics) { 1463 System.err.println(diagnostics); 1464 } 1465 } 1466 } 1467 break; 1468 case KILLED: 1469 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){ 1470 LOG.info(event.toString()); 1471 } 1472 break; 1473 case ALL: 1474 LOG.info(event.toString()); 1475 break; 1476 } 1477 } 1478 } 1479 1480 /** The interval at which monitorAndPrintJob() prints status */ 1481 public static int getProgressPollInterval(Configuration conf) { 1482 // Read progress monitor poll interval from config. Default is 1 second. 1483 int progMonitorPollIntervalMillis = conf.getInt( 1484 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL); 1485 if (progMonitorPollIntervalMillis < 1) { 1486 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 1487 " has been set to an invalid value; " 1488 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL); 1489 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL; 1490 } 1491 return progMonitorPollIntervalMillis; 1492 } 1493 1494 /** The interval at which waitForCompletion() should check. */ 1495 public static int getCompletionPollInterval(Configuration conf) { 1496 int completionPollIntervalMillis = conf.getInt( 1497 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL); 1498 if (completionPollIntervalMillis < 1) { 1499 LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 1500 " has been set to an invalid value; " 1501 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL); 1502 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL; 1503 } 1504 return completionPollIntervalMillis; 1505 } 1506 1507 /** 1508 * Get the task output filter. 1509 * 1510 * @param conf the configuration. 1511 * @return the filter level. 1512 */ 1513 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) { 1514 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED")); 1515 } 1516 1517 /** 1518 * Modify the Configuration to set the task output filter. 1519 * 1520 * @param conf the Configuration to modify. 1521 * @param newValue the value to set. 1522 */ 1523 public static void setTaskOutputFilter(Configuration conf, 1524 TaskStatusFilter newValue) { 1525 conf.set(Job.OUTPUT_FILTER, newValue.toString()); 1526 } 1527 1528 public boolean isUber() throws IOException, InterruptedException { 1529 ensureState(JobState.RUNNING); 1530 updateStatus(); 1531 return status.isUber(); 1532 } 1533 1534 /** 1535 * Get the reservation to which the job is submitted to, if any 1536 * 1537 * @return the reservationId the identifier of the job's reservation, null if 1538 * the job does not have any reservation associated with it 1539 */ 1540 public ReservationId getReservationId() { 1541 return reservationId; 1542 } 1543 1544 /** 1545 * Set the reservation to which the job is submitted to 1546 * 1547 * @param reservationId the reservationId to set 1548 */ 1549 public void setReservationId(ReservationId reservationId) { 1550 this.reservationId = reservationId; 1551 } 1552 1553}