001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.IOException; 022import java.net.URI; 023import java.security.PrivilegedExceptionAction; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.classification.InterfaceAudience.Private; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.conf.Configuration.IntegerRanges; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.RawComparator; 035import org.apache.hadoop.mapred.JobConf; 036import org.apache.hadoop.mapreduce.filecache.DistributedCache; 037import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 038import org.apache.hadoop.mapreduce.task.JobContextImpl; 039import org.apache.hadoop.mapreduce.util.ConfigUtil; 040import org.apache.hadoop.util.StringUtils; 041import org.apache.hadoop.yarn.api.records.ReservationId; 042 043/** 044 * The job submitter's view of the Job. 045 * 046 * <p>It allows the user to configure the 047 * job, submit it, control its execution, and query the state. The set methods 048 * only work until the job is submitted, afterwards they will throw an 049 * IllegalStateException. </p> 050 * 051 * <p> 052 * Normally the user creates the application, describes various facets of the 053 * job via {@link Job} and then submits the job and monitor its progress.</p> 054 * 055 * <p>Here is an example on how to submit a job:</p> 056 * <p><blockquote><pre> 057 * // Create a new Job 058 * Job job = Job.getInstance(); 059 * job.setJarByClass(MyJob.class); 060 * 061 * // Specify various job-specific parameters 062 * job.setJobName("myjob"); 063 * 064 * job.setInputPath(new Path("in")); 065 * job.setOutputPath(new Path("out")); 066 * 067 * job.setMapperClass(MyJob.MyMapper.class); 068 * job.setReducerClass(MyJob.MyReducer.class); 069 * 070 * // Submit the job, then poll for progress until the job is complete 071 * job.waitForCompletion(true); 072 * </pre></blockquote></p> 073 * 074 * 075 */ 076@InterfaceAudience.Public 077@InterfaceStability.Evolving 078public class Job extends JobContextImpl implements JobContext { 079 private static final Log LOG = LogFactory.getLog(Job.class); 080 081 @InterfaceStability.Evolving 082 public static enum JobState {DEFINE, RUNNING}; 083 private static final long MAX_JOBSTATUS_AGE = 1000 * 2; 084 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter"; 085 /** Key in mapred-*.xml that sets completionPollInvervalMillis */ 086 public static final String COMPLETION_POLL_INTERVAL_KEY = 087 "mapreduce.client.completion.pollinterval"; 088 089 /** Default completionPollIntervalMillis is 5000 ms. */ 090 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000; 091 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */ 092 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY = 093 "mapreduce.client.progressmonitor.pollinterval"; 094 /** Default progMonitorPollIntervalMillis is 1000 ms. */ 095 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; 096 097 public static final String USED_GENERIC_PARSER = 098 "mapreduce.client.genericoptionsparser.used"; 099 public static final String SUBMIT_REPLICATION = 100 "mapreduce.client.submit.file.replication"; 101 private static final String TASKLOG_PULL_TIMEOUT_KEY = 102 "mapreduce.client.tasklog.timeout"; 103 private static final int DEFAULT_TASKLOG_TIMEOUT = 60000; 104 public static final int DEFAULT_SUBMIT_REPLICATION = 10; 105 106 @InterfaceStability.Evolving 107 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 108 109 static { 110 ConfigUtil.loadResources(); 111 } 112 113 private JobState state = JobState.DEFINE; 114 private JobStatus status; 115 private long statustime; 116 private Cluster cluster; 117 private ReservationId reservationId; 118 119 /** 120 * @deprecated Use {@link #getInstance()} 121 */ 122 @Deprecated 123 public Job() throws IOException { 124 this(new Configuration()); 125 } 126 127 /** 128 * @deprecated Use {@link #getInstance(Configuration)} 129 */ 130 @Deprecated 131 public Job(Configuration conf) throws IOException { 132 this(new JobConf(conf)); 133 } 134 135 /** 136 * @deprecated Use {@link #getInstance(Configuration, String)} 137 */ 138 @Deprecated 139 public Job(Configuration conf, String jobName) throws IOException { 140 this(conf); 141 setJobName(jobName); 142 } 143 144 Job(JobConf conf) throws IOException { 145 super(conf, null); 146 // propagate existing user credentials to job 147 this.credentials.mergeAll(this.ugi.getCredentials()); 148 this.cluster = null; 149 } 150 151 Job(JobStatus status, JobConf conf) throws IOException { 152 this(conf); 153 setJobID(status.getJobID()); 154 this.status = status; 155 state = JobState.RUNNING; 156 } 157 158 159 /** 160 * Creates a new {@link Job} with no particular {@link Cluster} . 161 * A Cluster will be created with a generic {@link Configuration}. 162 * 163 * @return the {@link Job} , with no connection to a cluster yet. 164 * @throws IOException 165 */ 166 public static Job getInstance() throws IOException { 167 // create with a null Cluster 168 return getInstance(new Configuration()); 169 } 170 171 /** 172 * Creates a new {@link Job} with no particular {@link Cluster} and a 173 * given {@link Configuration}. 174 * 175 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 176 * that any necessary internal modifications do not reflect on the incoming 177 * parameter. 178 * 179 * A Cluster will be created from the conf parameter only when it's needed. 180 * 181 * @param conf the configuration 182 * @return the {@link Job} , with no connection to a cluster yet. 183 * @throws IOException 184 */ 185 public static Job getInstance(Configuration conf) throws IOException { 186 // create with a null Cluster 187 JobConf jobConf = new JobConf(conf); 188 return new Job(jobConf); 189 } 190 191 192 /** 193 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName. 194 * A Cluster will be created from the conf parameter only when it's needed. 195 * 196 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 197 * that any necessary internal modifications do not reflect on the incoming 198 * parameter. 199 * 200 * @param conf the configuration 201 * @return the {@link Job} , with no connection to a cluster yet. 202 * @throws IOException 203 */ 204 public static Job getInstance(Configuration conf, String jobName) 205 throws IOException { 206 // create with a null Cluster 207 Job result = getInstance(conf); 208 result.setJobName(jobName); 209 return result; 210 } 211 212 /** 213 * Creates a new {@link Job} with no particular {@link Cluster} and given 214 * {@link Configuration} and {@link JobStatus}. 215 * A Cluster will be created from the conf parameter only when it's needed. 216 * 217 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 218 * that any necessary internal modifications do not reflect on the incoming 219 * parameter. 220 * 221 * @param status job status 222 * @param conf job configuration 223 * @return the {@link Job} , with no connection to a cluster yet. 224 * @throws IOException 225 */ 226 public static Job getInstance(JobStatus status, Configuration conf) 227 throws IOException { 228 return new Job(status, new JobConf(conf)); 229 } 230 231 /** 232 * Creates a new {@link Job} with no particular {@link Cluster}. 233 * A Cluster will be created from the conf parameter only when it's needed. 234 * 235 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 236 * that any necessary internal modifications do not reflect on the incoming 237 * parameter. 238 * 239 * @param ignored 240 * @return the {@link Job} , with no connection to a cluster yet. 241 * @throws IOException 242 * @deprecated Use {@link #getInstance()} 243 */ 244 @Deprecated 245 public static Job getInstance(Cluster ignored) throws IOException { 246 return getInstance(); 247 } 248 249 /** 250 * Creates a new {@link Job} with no particular {@link Cluster} and given 251 * {@link Configuration}. 252 * A Cluster will be created from the conf parameter only when it's needed. 253 * 254 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 255 * that any necessary internal modifications do not reflect on the incoming 256 * parameter. 257 * 258 * @param ignored 259 * @param conf job configuration 260 * @return the {@link Job} , with no connection to a cluster yet. 261 * @throws IOException 262 * @deprecated Use {@link #getInstance(Configuration)} 263 */ 264 @Deprecated 265 public static Job getInstance(Cluster ignored, Configuration conf) 266 throws IOException { 267 return getInstance(conf); 268 } 269 270 /** 271 * Creates a new {@link Job} with no particular {@link Cluster} and given 272 * {@link Configuration} and {@link JobStatus}. 273 * A Cluster will be created from the conf parameter only when it's needed. 274 * 275 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 276 * that any necessary internal modifications do not reflect on the incoming 277 * parameter. 278 * 279 * @param cluster cluster 280 * @param status job status 281 * @param conf job configuration 282 * @return the {@link Job} , with no connection to a cluster yet. 283 * @throws IOException 284 */ 285 @Private 286 public static Job getInstance(Cluster cluster, JobStatus status, 287 Configuration conf) throws IOException { 288 Job job = getInstance(status, conf); 289 job.setCluster(cluster); 290 return job; 291 } 292 293 private void ensureState(JobState state) throws IllegalStateException { 294 if (state != this.state) { 295 throw new IllegalStateException("Job in state "+ this.state + 296 " instead of " + state); 297 } 298 299 if (state == JobState.RUNNING && cluster == null) { 300 throw new IllegalStateException 301 ("Job in state " + this.state 302 + ", but it isn't attached to any job tracker!"); 303 } 304 } 305 306 /** 307 * Some methods rely on having a recent job status object. Refresh 308 * it, if necessary 309 */ 310 synchronized void ensureFreshStatus() 311 throws IOException { 312 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) { 313 updateStatus(); 314 } 315 } 316 317 /** Some methods need to update status immediately. So, refresh 318 * immediately 319 * @throws IOException 320 */ 321 synchronized void updateStatus() throws IOException { 322 try { 323 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 324 @Override 325 public JobStatus run() throws IOException, InterruptedException { 326 return cluster.getClient().getJobStatus(status.getJobID()); 327 } 328 }); 329 } 330 catch (InterruptedException ie) { 331 throw new IOException(ie); 332 } 333 if (this.status == null) { 334 throw new IOException("Job status not available "); 335 } 336 this.statustime = System.currentTimeMillis(); 337 } 338 339 public JobStatus getStatus() throws IOException, InterruptedException { 340 ensureState(JobState.RUNNING); 341 updateStatus(); 342 return status; 343 } 344 345 private void setStatus(JobStatus status) { 346 this.status = status; 347 } 348 349 /** 350 * Returns the current state of the Job. 351 * 352 * @return JobStatus#State 353 * @throws IOException 354 * @throws InterruptedException 355 */ 356 public JobStatus.State getJobState() 357 throws IOException, InterruptedException { 358 ensureState(JobState.RUNNING); 359 updateStatus(); 360 return status.getState(); 361 } 362 363 /** 364 * Get the URL where some job progress information will be displayed. 365 * 366 * @return the URL where some job progress information will be displayed. 367 */ 368 public String getTrackingURL(){ 369 ensureState(JobState.RUNNING); 370 return status.getTrackingUrl().toString(); 371 } 372 373 /** 374 * Get the path of the submitted job configuration. 375 * 376 * @return the path of the submitted job configuration. 377 */ 378 public String getJobFile() { 379 ensureState(JobState.RUNNING); 380 return status.getJobFile(); 381 } 382 383 /** 384 * Get start time of the job. 385 * 386 * @return the start time of the job 387 */ 388 public long getStartTime() { 389 ensureState(JobState.RUNNING); 390 return status.getStartTime(); 391 } 392 393 /** 394 * Get finish time of the job. 395 * 396 * @return the finish time of the job 397 */ 398 public long getFinishTime() throws IOException, InterruptedException { 399 ensureState(JobState.RUNNING); 400 updateStatus(); 401 return status.getFinishTime(); 402 } 403 404 /** 405 * Get scheduling info of the job. 406 * 407 * @return the scheduling info of the job 408 */ 409 public String getSchedulingInfo() { 410 ensureState(JobState.RUNNING); 411 return status.getSchedulingInfo(); 412 } 413 414 /** 415 * Get scheduling info of the job. 416 * 417 * @return the scheduling info of the job 418 */ 419 public JobPriority getPriority() throws IOException, InterruptedException { 420 ensureState(JobState.RUNNING); 421 updateStatus(); 422 return status.getPriority(); 423 } 424 425 /** 426 * The user-specified job name. 427 */ 428 public String getJobName() { 429 if (state == JobState.DEFINE) { 430 return super.getJobName(); 431 } 432 ensureState(JobState.RUNNING); 433 return status.getJobName(); 434 } 435 436 public String getHistoryUrl() throws IOException, InterruptedException { 437 ensureState(JobState.RUNNING); 438 updateStatus(); 439 return status.getHistoryFile(); 440 } 441 442 public boolean isRetired() throws IOException, InterruptedException { 443 ensureState(JobState.RUNNING); 444 updateStatus(); 445 return status.isRetired(); 446 } 447 448 @Private 449 public Cluster getCluster() { 450 return cluster; 451 } 452 453 /** Only for mocks in unit tests. */ 454 @Private 455 private void setCluster(Cluster cluster) { 456 this.cluster = cluster; 457 } 458 459 /** 460 * Dump stats to screen. 461 */ 462 @Override 463 public String toString() { 464 ensureState(JobState.RUNNING); 465 String reasonforFailure = " "; 466 int numMaps = 0; 467 int numReduces = 0; 468 try { 469 updateStatus(); 470 if (status.getState().equals(JobStatus.State.FAILED)) 471 reasonforFailure = getTaskFailureEventString(); 472 numMaps = getTaskReports(TaskType.MAP).length; 473 numReduces = getTaskReports(TaskType.REDUCE).length; 474 } catch (IOException e) { 475 } catch (InterruptedException ie) { 476 } 477 StringBuffer sb = new StringBuffer(); 478 sb.append("Job: ").append(status.getJobID()).append("\n"); 479 sb.append("Job File: ").append(status.getJobFile()).append("\n"); 480 sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); 481 sb.append("\n"); 482 sb.append("Uber job : ").append(status.isUber()).append("\n"); 483 sb.append("Number of maps: ").append(numMaps).append("\n"); 484 sb.append("Number of reduces: ").append(numReduces).append("\n"); 485 sb.append("map() completion: "); 486 sb.append(status.getMapProgress()).append("\n"); 487 sb.append("reduce() completion: "); 488 sb.append(status.getReduceProgress()).append("\n"); 489 sb.append("Job state: "); 490 sb.append(status.getState()).append("\n"); 491 sb.append("retired: ").append(status.isRetired()).append("\n"); 492 sb.append("reason for failure: ").append(reasonforFailure); 493 return sb.toString(); 494 } 495 496 /** 497 * @return taskid which caused job failure 498 * @throws IOException 499 * @throws InterruptedException 500 */ 501 String getTaskFailureEventString() throws IOException, 502 InterruptedException { 503 int failCount = 1; 504 TaskCompletionEvent lastEvent = null; 505 TaskCompletionEvent[] events = ugi.doAs(new 506 PrivilegedExceptionAction<TaskCompletionEvent[]>() { 507 @Override 508 public TaskCompletionEvent[] run() throws IOException, 509 InterruptedException { 510 return cluster.getClient().getTaskCompletionEvents( 511 status.getJobID(), 0, 10); 512 } 513 }); 514 for (TaskCompletionEvent event : events) { 515 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) { 516 failCount++; 517 lastEvent = event; 518 } 519 } 520 if (lastEvent == null) { 521 return "There are no failed tasks for the job. " 522 + "Job is failed due to some other reason and reason " 523 + "can be found in the logs."; 524 } 525 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2); 526 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2); 527 return (" task " + taskID + " failed " + 528 failCount + " times " + "For details check tasktracker at: " + 529 lastEvent.getTaskTrackerHttp()); 530 } 531 532 /** 533 * Get the information of the current state of the tasks of a job. 534 * 535 * @param type Type of the task 536 * @return the list of all of the map tips. 537 * @throws IOException 538 */ 539 public TaskReport[] getTaskReports(TaskType type) 540 throws IOException, InterruptedException { 541 ensureState(JobState.RUNNING); 542 final TaskType tmpType = type; 543 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() { 544 public TaskReport[] run() throws IOException, InterruptedException { 545 return cluster.getClient().getTaskReports(getJobID(), tmpType); 546 } 547 }); 548 } 549 550 /** 551 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 552 * and 1.0. When all map tasks have completed, the function returns 1.0. 553 * 554 * @return the progress of the job's map-tasks. 555 * @throws IOException 556 */ 557 public float mapProgress() throws IOException { 558 ensureState(JobState.RUNNING); 559 ensureFreshStatus(); 560 return status.getMapProgress(); 561 } 562 563 /** 564 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 565 * and 1.0. When all reduce tasks have completed, the function returns 1.0. 566 * 567 * @return the progress of the job's reduce-tasks. 568 * @throws IOException 569 */ 570 public float reduceProgress() throws IOException { 571 ensureState(JobState.RUNNING); 572 ensureFreshStatus(); 573 return status.getReduceProgress(); 574 } 575 576 /** 577 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 578 * and 1.0. When all cleanup tasks have completed, the function returns 1.0. 579 * 580 * @return the progress of the job's cleanup-tasks. 581 * @throws IOException 582 */ 583 public float cleanupProgress() throws IOException, InterruptedException { 584 ensureState(JobState.RUNNING); 585 ensureFreshStatus(); 586 return status.getCleanupProgress(); 587 } 588 589 /** 590 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 591 * and 1.0. When all setup tasks have completed, the function returns 1.0. 592 * 593 * @return the progress of the job's setup-tasks. 594 * @throws IOException 595 */ 596 public float setupProgress() throws IOException { 597 ensureState(JobState.RUNNING); 598 ensureFreshStatus(); 599 return status.getSetupProgress(); 600 } 601 602 /** 603 * Check if the job is finished or not. 604 * This is a non-blocking call. 605 * 606 * @return <code>true</code> if the job is complete, else <code>false</code>. 607 * @throws IOException 608 */ 609 public boolean isComplete() throws IOException { 610 ensureState(JobState.RUNNING); 611 updateStatus(); 612 return status.isJobComplete(); 613 } 614 615 /** 616 * Check if the job completed successfully. 617 * 618 * @return <code>true</code> if the job succeeded, else <code>false</code>. 619 * @throws IOException 620 */ 621 public boolean isSuccessful() throws IOException { 622 ensureState(JobState.RUNNING); 623 updateStatus(); 624 return status.getState() == JobStatus.State.SUCCEEDED; 625 } 626 627 /** 628 * Kill the running job. Blocks until all job tasks have been 629 * killed as well. If the job is no longer running, it simply returns. 630 * 631 * @throws IOException 632 */ 633 public void killJob() throws IOException { 634 ensureState(JobState.RUNNING); 635 try { 636 cluster.getClient().killJob(getJobID()); 637 } 638 catch (InterruptedException ie) { 639 throw new IOException(ie); 640 } 641 } 642 643 /** 644 * Set the priority of a running job. 645 * @param priority the new priority for the job. 646 * @throws IOException 647 */ 648 public void setPriority(JobPriority priority) 649 throws IOException, InterruptedException { 650 if (state == JobState.DEFINE) { 651 conf.setJobPriority( 652 org.apache.hadoop.mapred.JobPriority.valueOf(priority.name())); 653 } else { 654 ensureState(JobState.RUNNING); 655 final JobPriority tmpPriority = priority; 656 ugi.doAs(new PrivilegedExceptionAction<Object>() { 657 @Override 658 public Object run() throws IOException, InterruptedException { 659 cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString()); 660 return null; 661 } 662 }); 663 } 664 } 665 666 /** 667 * Get events indicating completion (success/failure) of component tasks. 668 * 669 * @param startFrom index to start fetching events from 670 * @param numEvents number of events to fetch 671 * @return an array of {@link TaskCompletionEvent}s 672 * @throws IOException 673 */ 674 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom, 675 final int numEvents) throws IOException, InterruptedException { 676 ensureState(JobState.RUNNING); 677 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() { 678 @Override 679 public TaskCompletionEvent[] run() throws IOException, InterruptedException { 680 return cluster.getClient().getTaskCompletionEvents(getJobID(), 681 startFrom, numEvents); 682 } 683 }); 684 } 685 686 /** 687 * Get events indicating completion (success/failure) of component tasks. 688 * 689 * @param startFrom index to start fetching events from 690 * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s 691 * @throws IOException 692 */ 693 public org.apache.hadoop.mapred.TaskCompletionEvent[] 694 getTaskCompletionEvents(final int startFrom) throws IOException { 695 try { 696 TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10); 697 org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents = 698 new org.apache.hadoop.mapred.TaskCompletionEvent[events.length]; 699 for (int i = 0; i < events.length; i++) { 700 retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade 701 (events[i]); 702 } 703 return retEvents; 704 } catch (InterruptedException ie) { 705 throw new IOException(ie); 706 } 707 } 708 709 /** 710 * Kill indicated task attempt. 711 * @param taskId the id of the task to kill. 712 * @param shouldFail if <code>true</code> the task is failed and added 713 * to failed tasks list, otherwise it is just killed, 714 * w/o affecting job failure status. 715 */ 716 @Private 717 public boolean killTask(final TaskAttemptID taskId, 718 final boolean shouldFail) throws IOException { 719 ensureState(JobState.RUNNING); 720 try { 721 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { 722 public Boolean run() throws IOException, InterruptedException { 723 return cluster.getClient().killTask(taskId, shouldFail); 724 } 725 }); 726 } 727 catch (InterruptedException ie) { 728 throw new IOException(ie); 729 } 730 } 731 732 /** 733 * Kill indicated task attempt. 734 * 735 * @param taskId the id of the task to be terminated. 736 * @throws IOException 737 */ 738 public void killTask(final TaskAttemptID taskId) 739 throws IOException { 740 killTask(taskId, false); 741 } 742 743 /** 744 * Fail indicated task attempt. 745 * 746 * @param taskId the id of the task to be terminated. 747 * @throws IOException 748 */ 749 public void failTask(final TaskAttemptID taskId) 750 throws IOException { 751 killTask(taskId, true); 752 } 753 754 /** 755 * Gets the counters for this job. May return null if the job has been 756 * retired and the job is no longer in the completed job store. 757 * 758 * @return the counters for this job. 759 * @throws IOException 760 */ 761 public Counters getCounters() 762 throws IOException { 763 ensureState(JobState.RUNNING); 764 try { 765 return ugi.doAs(new PrivilegedExceptionAction<Counters>() { 766 @Override 767 public Counters run() throws IOException, InterruptedException { 768 return cluster.getClient().getJobCounters(getJobID()); 769 } 770 }); 771 } 772 catch (InterruptedException ie) { 773 throw new IOException(ie); 774 } 775 } 776 777 /** 778 * Gets the diagnostic messages for a given task attempt. 779 * @param taskid 780 * @return the list of diagnostic messages for the task 781 * @throws IOException 782 */ 783 public String[] getTaskDiagnostics(final TaskAttemptID taskid) 784 throws IOException, InterruptedException { 785 ensureState(JobState.RUNNING); 786 return ugi.doAs(new PrivilegedExceptionAction<String[]>() { 787 @Override 788 public String[] run() throws IOException, InterruptedException { 789 return cluster.getClient().getTaskDiagnostics(taskid); 790 } 791 }); 792 } 793 794 /** 795 * Set the number of reduce tasks for the job. 796 * @param tasks the number of reduce tasks 797 * @throws IllegalStateException if the job is submitted 798 */ 799 public void setNumReduceTasks(int tasks) throws IllegalStateException { 800 ensureState(JobState.DEFINE); 801 conf.setNumReduceTasks(tasks); 802 } 803 804 /** 805 * Set the current working directory for the default file system. 806 * 807 * @param dir the new current working directory. 808 * @throws IllegalStateException if the job is submitted 809 */ 810 public void setWorkingDirectory(Path dir) throws IOException { 811 ensureState(JobState.DEFINE); 812 conf.setWorkingDirectory(dir); 813 } 814 815 /** 816 * Set the {@link InputFormat} for the job. 817 * @param cls the <code>InputFormat</code> to use 818 * @throws IllegalStateException if the job is submitted 819 */ 820 public void setInputFormatClass(Class<? extends InputFormat> cls 821 ) throws IllegalStateException { 822 ensureState(JobState.DEFINE); 823 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 824 InputFormat.class); 825 } 826 827 /** 828 * Set the {@link OutputFormat} for the job. 829 * @param cls the <code>OutputFormat</code> to use 830 * @throws IllegalStateException if the job is submitted 831 */ 832 public void setOutputFormatClass(Class<? extends OutputFormat> cls 833 ) throws IllegalStateException { 834 ensureState(JobState.DEFINE); 835 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 836 OutputFormat.class); 837 } 838 839 /** 840 * Set the {@link Mapper} for the job. 841 * @param cls the <code>Mapper</code> to use 842 * @throws IllegalStateException if the job is submitted 843 */ 844 public void setMapperClass(Class<? extends Mapper> cls 845 ) throws IllegalStateException { 846 ensureState(JobState.DEFINE); 847 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class); 848 } 849 850 /** 851 * Set the Jar by finding where a given class came from. 852 * @param cls the example class 853 */ 854 public void setJarByClass(Class<?> cls) { 855 ensureState(JobState.DEFINE); 856 conf.setJarByClass(cls); 857 } 858 859 /** 860 * Set the job jar 861 */ 862 public void setJar(String jar) { 863 ensureState(JobState.DEFINE); 864 conf.setJar(jar); 865 } 866 867 /** 868 * Set the reported username for this job. 869 * 870 * @param user the username for this job. 871 */ 872 public void setUser(String user) { 873 ensureState(JobState.DEFINE); 874 conf.setUser(user); 875 } 876 877 /** 878 * Set the combiner class for the job. 879 * @param cls the combiner to use 880 * @throws IllegalStateException if the job is submitted 881 */ 882 public void setCombinerClass(Class<? extends Reducer> cls 883 ) throws IllegalStateException { 884 ensureState(JobState.DEFINE); 885 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); 886 } 887 888 /** 889 * Set the {@link Reducer} for the job. 890 * @param cls the <code>Reducer</code> to use 891 * @throws IllegalStateException if the job is submitted 892 */ 893 public void setReducerClass(Class<? extends Reducer> cls 894 ) throws IllegalStateException { 895 ensureState(JobState.DEFINE); 896 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class); 897 } 898 899 /** 900 * Set the {@link Partitioner} for the job. 901 * @param cls the <code>Partitioner</code> to use 902 * @throws IllegalStateException if the job is submitted 903 */ 904 public void setPartitionerClass(Class<? extends Partitioner> cls 905 ) throws IllegalStateException { 906 ensureState(JobState.DEFINE); 907 conf.setClass(PARTITIONER_CLASS_ATTR, cls, 908 Partitioner.class); 909 } 910 911 /** 912 * Set the key class for the map output data. This allows the user to 913 * specify the map output key class to be different than the final output 914 * value class. 915 * 916 * @param theClass the map output key class. 917 * @throws IllegalStateException if the job is submitted 918 */ 919 public void setMapOutputKeyClass(Class<?> theClass 920 ) throws IllegalStateException { 921 ensureState(JobState.DEFINE); 922 conf.setMapOutputKeyClass(theClass); 923 } 924 925 /** 926 * Set the value class for the map output data. This allows the user to 927 * specify the map output value class to be different than the final output 928 * value class. 929 * 930 * @param theClass the map output value class. 931 * @throws IllegalStateException if the job is submitted 932 */ 933 public void setMapOutputValueClass(Class<?> theClass 934 ) throws IllegalStateException { 935 ensureState(JobState.DEFINE); 936 conf.setMapOutputValueClass(theClass); 937 } 938 939 /** 940 * Set the key class for the job output data. 941 * 942 * @param theClass the key class for the job output data. 943 * @throws IllegalStateException if the job is submitted 944 */ 945 public void setOutputKeyClass(Class<?> theClass 946 ) throws IllegalStateException { 947 ensureState(JobState.DEFINE); 948 conf.setOutputKeyClass(theClass); 949 } 950 951 /** 952 * Set the value class for job outputs. 953 * 954 * @param theClass the value class for job outputs. 955 * @throws IllegalStateException if the job is submitted 956 */ 957 public void setOutputValueClass(Class<?> theClass 958 ) throws IllegalStateException { 959 ensureState(JobState.DEFINE); 960 conf.setOutputValueClass(theClass); 961 } 962 963 /** 964 * Define the comparator that controls which keys are grouped together 965 * for a single call to combiner, 966 * {@link Reducer#reduce(Object, Iterable, 967 * org.apache.hadoop.mapreduce.Reducer.Context)} 968 * 969 * @param cls the raw comparator to use 970 * @throws IllegalStateException if the job is submitted 971 */ 972 public void setCombinerKeyGroupingComparatorClass( 973 Class<? extends RawComparator> cls) throws IllegalStateException { 974 ensureState(JobState.DEFINE); 975 conf.setCombinerKeyGroupingComparator(cls); 976 } 977 978 /** 979 * Define the comparator that controls how the keys are sorted before they 980 * are passed to the {@link Reducer}. 981 * @param cls the raw comparator 982 * @throws IllegalStateException if the job is submitted 983 * @see #setCombinerKeyGroupingComparatorClass(Class) 984 */ 985 public void setSortComparatorClass(Class<? extends RawComparator> cls 986 ) throws IllegalStateException { 987 ensureState(JobState.DEFINE); 988 conf.setOutputKeyComparatorClass(cls); 989 } 990 991 /** 992 * Define the comparator that controls which keys are grouped together 993 * for a single call to 994 * {@link Reducer#reduce(Object, Iterable, 995 * org.apache.hadoop.mapreduce.Reducer.Context)} 996 * @param cls the raw comparator to use 997 * @throws IllegalStateException if the job is submitted 998 * @see #setCombinerKeyGroupingComparatorClass(Class) 999 */ 1000 public void setGroupingComparatorClass(Class<? extends RawComparator> cls 1001 ) throws IllegalStateException { 1002 ensureState(JobState.DEFINE); 1003 conf.setOutputValueGroupingComparator(cls); 1004 } 1005 1006 /** 1007 * Set the user-specified job name. 1008 * 1009 * @param name the job's new name. 1010 * @throws IllegalStateException if the job is submitted 1011 */ 1012 public void setJobName(String name) throws IllegalStateException { 1013 ensureState(JobState.DEFINE); 1014 conf.setJobName(name); 1015 } 1016 1017 /** 1018 * Turn speculative execution on or off for this job. 1019 * 1020 * @param speculativeExecution <code>true</code> if speculative execution 1021 * should be turned on, else <code>false</code>. 1022 */ 1023 public void setSpeculativeExecution(boolean speculativeExecution) { 1024 ensureState(JobState.DEFINE); 1025 conf.setSpeculativeExecution(speculativeExecution); 1026 } 1027 1028 /** 1029 * Turn speculative execution on or off for this job for map tasks. 1030 * 1031 * @param speculativeExecution <code>true</code> if speculative execution 1032 * should be turned on for map tasks, 1033 * else <code>false</code>. 1034 */ 1035 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1036 ensureState(JobState.DEFINE); 1037 conf.setMapSpeculativeExecution(speculativeExecution); 1038 } 1039 1040 /** 1041 * Turn speculative execution on or off for this job for reduce tasks. 1042 * 1043 * @param speculativeExecution <code>true</code> if speculative execution 1044 * should be turned on for reduce tasks, 1045 * else <code>false</code>. 1046 */ 1047 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1048 ensureState(JobState.DEFINE); 1049 conf.setReduceSpeculativeExecution(speculativeExecution); 1050 } 1051 1052 /** 1053 * Specify whether job-setup and job-cleanup is needed for the job 1054 * 1055 * @param needed If <code>true</code>, job-setup and job-cleanup will be 1056 * considered from {@link OutputCommitter} 1057 * else ignored. 1058 */ 1059 public void setJobSetupCleanupNeeded(boolean needed) { 1060 ensureState(JobState.DEFINE); 1061 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed); 1062 } 1063 1064 /** 1065 * Set the given set of archives 1066 * @param archives The list of archives that need to be localized 1067 */ 1068 public void setCacheArchives(URI[] archives) { 1069 ensureState(JobState.DEFINE); 1070 DistributedCache.setCacheArchives(archives, conf); 1071 } 1072 1073 /** 1074 * Set the given set of files 1075 * @param files The list of files that need to be localized 1076 */ 1077 public void setCacheFiles(URI[] files) { 1078 ensureState(JobState.DEFINE); 1079 DistributedCache.setCacheFiles(files, conf); 1080 } 1081 1082 /** 1083 * Add a archives to be localized 1084 * @param uri The uri of the cache to be localized 1085 */ 1086 public void addCacheArchive(URI uri) { 1087 ensureState(JobState.DEFINE); 1088 DistributedCache.addCacheArchive(uri, conf); 1089 } 1090 1091 /** 1092 * Add a file to be localized 1093 * @param uri The uri of the cache to be localized 1094 */ 1095 public void addCacheFile(URI uri) { 1096 ensureState(JobState.DEFINE); 1097 DistributedCache.addCacheFile(uri, conf); 1098 } 1099 1100 /** 1101 * Add an file path to the current set of classpath entries It adds the file 1102 * to cache as well. 1103 * 1104 * Files added with this method will not be unpacked while being added to the 1105 * classpath. 1106 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)} 1107 * method instead. 1108 * 1109 * @param file Path of the file to be added 1110 */ 1111 public void addFileToClassPath(Path file) 1112 throws IOException { 1113 ensureState(JobState.DEFINE); 1114 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf)); 1115 } 1116 1117 /** 1118 * Add an archive path to the current set of classpath entries. It adds the 1119 * archive to cache as well. 1120 * 1121 * Archive files will be unpacked and added to the classpath 1122 * when being distributed. 1123 * 1124 * @param archive Path of the archive to be added 1125 */ 1126 public void addArchiveToClassPath(Path archive) 1127 throws IOException { 1128 ensureState(JobState.DEFINE); 1129 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); 1130 } 1131 1132 /** 1133 * Originally intended to enable symlinks, but currently symlinks cannot be 1134 * disabled. 1135 */ 1136 @Deprecated 1137 public void createSymlink() { 1138 ensureState(JobState.DEFINE); 1139 DistributedCache.createSymlink(conf); 1140 } 1141 1142 /** 1143 * Expert: Set the number of maximum attempts that will be made to run a 1144 * map task. 1145 * 1146 * @param n the number of attempts per map task. 1147 */ 1148 public void setMaxMapAttempts(int n) { 1149 ensureState(JobState.DEFINE); 1150 conf.setMaxMapAttempts(n); 1151 } 1152 1153 /** 1154 * Expert: Set the number of maximum attempts that will be made to run a 1155 * reduce task. 1156 * 1157 * @param n the number of attempts per reduce task. 1158 */ 1159 public void setMaxReduceAttempts(int n) { 1160 ensureState(JobState.DEFINE); 1161 conf.setMaxReduceAttempts(n); 1162 } 1163 1164 /** 1165 * Set whether the system should collect profiler information for some of 1166 * the tasks in this job? The information is stored in the user log 1167 * directory. 1168 * @param newValue true means it should be gathered 1169 */ 1170 public void setProfileEnabled(boolean newValue) { 1171 ensureState(JobState.DEFINE); 1172 conf.setProfileEnabled(newValue); 1173 } 1174 1175 /** 1176 * Set the profiler configuration arguments. If the string contains a '%s' it 1177 * will be replaced with the name of the profiling output file when the task 1178 * runs. 1179 * 1180 * This value is passed to the task child JVM on the command line. 1181 * 1182 * @param value the configuration string 1183 */ 1184 public void setProfileParams(String value) { 1185 ensureState(JobState.DEFINE); 1186 conf.setProfileParams(value); 1187 } 1188 1189 /** 1190 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1191 * must also be called. 1192 * @param newValue a set of integer ranges of the map ids 1193 */ 1194 public void setProfileTaskRange(boolean isMap, String newValue) { 1195 ensureState(JobState.DEFINE); 1196 conf.setProfileTaskRange(isMap, newValue); 1197 } 1198 1199 private void ensureNotSet(String attr, String msg) throws IOException { 1200 if (conf.get(attr) != null) { 1201 throw new IOException(attr + " is incompatible with " + msg + " mode."); 1202 } 1203 } 1204 1205 /** 1206 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation 1207 * tokens upon job completion. Defaults to true. 1208 */ 1209 public void setCancelDelegationTokenUponJobCompletion(boolean value) { 1210 ensureState(JobState.DEFINE); 1211 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value); 1212 } 1213 1214 /** 1215 * Default to the new APIs unless they are explicitly set or the old mapper or 1216 * reduce attributes are used. 1217 * @throws IOException if the configuration is inconsistant 1218 */ 1219 private void setUseNewAPI() throws IOException { 1220 int numReduces = conf.getNumReduceTasks(); 1221 String oldMapperClass = "mapred.mapper.class"; 1222 String oldReduceClass = "mapred.reducer.class"; 1223 conf.setBooleanIfUnset("mapred.mapper.new-api", 1224 conf.get(oldMapperClass) == null); 1225 if (conf.getUseNewMapper()) { 1226 String mode = "new map API"; 1227 ensureNotSet("mapred.input.format.class", mode); 1228 ensureNotSet(oldMapperClass, mode); 1229 if (numReduces != 0) { 1230 ensureNotSet("mapred.partitioner.class", mode); 1231 } else { 1232 ensureNotSet("mapred.output.format.class", mode); 1233 } 1234 } else { 1235 String mode = "map compatability"; 1236 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode); 1237 ensureNotSet(MAP_CLASS_ATTR, mode); 1238 if (numReduces != 0) { 1239 ensureNotSet(PARTITIONER_CLASS_ATTR, mode); 1240 } else { 1241 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1242 } 1243 } 1244 if (numReduces != 0) { 1245 conf.setBooleanIfUnset("mapred.reducer.new-api", 1246 conf.get(oldReduceClass) == null); 1247 if (conf.getUseNewReducer()) { 1248 String mode = "new reduce API"; 1249 ensureNotSet("mapred.output.format.class", mode); 1250 ensureNotSet(oldReduceClass, mode); 1251 } else { 1252 String mode = "reduce compatability"; 1253 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1254 ensureNotSet(REDUCE_CLASS_ATTR, mode); 1255 } 1256 } 1257 } 1258 1259 private synchronized void connect() 1260 throws IOException, InterruptedException, ClassNotFoundException { 1261 if (cluster == null) { 1262 cluster = 1263 ugi.doAs(new PrivilegedExceptionAction<Cluster>() { 1264 public Cluster run() 1265 throws IOException, InterruptedException, 1266 ClassNotFoundException { 1267 return new Cluster(getConfiguration()); 1268 } 1269 }); 1270 } 1271 } 1272 1273 boolean isConnected() { 1274 return cluster != null; 1275 } 1276 1277 /** Only for mocking via unit tests. */ 1278 @Private 1279 public JobSubmitter getJobSubmitter(FileSystem fs, 1280 ClientProtocol submitClient) throws IOException { 1281 return new JobSubmitter(fs, submitClient); 1282 } 1283 /** 1284 * Submit the job to the cluster and return immediately. 1285 * @throws IOException 1286 */ 1287 public void submit() 1288 throws IOException, InterruptedException, ClassNotFoundException { 1289 ensureState(JobState.DEFINE); 1290 setUseNewAPI(); 1291 connect(); 1292 final JobSubmitter submitter = 1293 getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); 1294 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 1295 public JobStatus run() throws IOException, InterruptedException, 1296 ClassNotFoundException { 1297 return submitter.submitJobInternal(Job.this, cluster); 1298 } 1299 }); 1300 state = JobState.RUNNING; 1301 LOG.info("The url to track the job: " + getTrackingURL()); 1302 } 1303 1304 /** 1305 * Submit the job to the cluster and wait for it to finish. 1306 * @param verbose print the progress to the user 1307 * @return true if the job succeeded 1308 * @throws IOException thrown if the communication with the 1309 * <code>JobTracker</code> is lost 1310 */ 1311 public boolean waitForCompletion(boolean verbose 1312 ) throws IOException, InterruptedException, 1313 ClassNotFoundException { 1314 if (state == JobState.DEFINE) { 1315 submit(); 1316 } 1317 if (verbose) { 1318 monitorAndPrintJob(); 1319 } else { 1320 // get the completion poll interval from the client. 1321 int completionPollIntervalMillis = 1322 Job.getCompletionPollInterval(cluster.getConf()); 1323 while (!isComplete()) { 1324 try { 1325 Thread.sleep(completionPollIntervalMillis); 1326 } catch (InterruptedException ie) { 1327 } 1328 } 1329 } 1330 return isSuccessful(); 1331 } 1332 1333 /** 1334 * Monitor a job and print status in real-time as progress is made and tasks 1335 * fail. 1336 * @return true if the job succeeded 1337 * @throws IOException if communication to the JobTracker fails 1338 */ 1339 public boolean monitorAndPrintJob() 1340 throws IOException, InterruptedException { 1341 String lastReport = null; 1342 Job.TaskStatusFilter filter; 1343 Configuration clientConf = getConfiguration(); 1344 filter = Job.getTaskOutputFilter(clientConf); 1345 JobID jobId = getJobID(); 1346 LOG.info("Running job: " + jobId); 1347 int eventCounter = 0; 1348 boolean profiling = getProfileEnabled(); 1349 IntegerRanges mapRanges = getProfileTaskRange(true); 1350 IntegerRanges reduceRanges = getProfileTaskRange(false); 1351 int progMonitorPollIntervalMillis = 1352 Job.getProgressPollInterval(clientConf); 1353 /* make sure to report full progress after the job is done */ 1354 boolean reportedAfterCompletion = false; 1355 boolean reportedUberMode = false; 1356 while (!isComplete() || !reportedAfterCompletion) { 1357 if (isComplete()) { 1358 reportedAfterCompletion = true; 1359 } else { 1360 Thread.sleep(progMonitorPollIntervalMillis); 1361 } 1362 if (status.getState() == JobStatus.State.PREP) { 1363 continue; 1364 } 1365 if (!reportedUberMode) { 1366 reportedUberMode = true; 1367 LOG.info("Job " + jobId + " running in uber mode : " + isUber()); 1368 } 1369 String report = 1370 (" map " + StringUtils.formatPercent(mapProgress(), 0)+ 1371 " reduce " + 1372 StringUtils.formatPercent(reduceProgress(), 0)); 1373 if (!report.equals(lastReport)) { 1374 LOG.info(report); 1375 lastReport = report; 1376 } 1377 1378 TaskCompletionEvent[] events = 1379 getTaskCompletionEvents(eventCounter, 10); 1380 eventCounter += events.length; 1381 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges); 1382 } 1383 boolean success = isSuccessful(); 1384 if (success) { 1385 LOG.info("Job " + jobId + " completed successfully"); 1386 } else { 1387 LOG.info("Job " + jobId + " failed with state " + status.getState() + 1388 " due to: " + status.getFailureInfo()); 1389 } 1390 Counters counters = getCounters(); 1391 if (counters != null) { 1392 LOG.info(counters.toString()); 1393 } 1394 return success; 1395 } 1396 1397 /** 1398 * @return true if the profile parameters indicate that this is using 1399 * hprof, which generates profile files in a particular location 1400 * that we can retrieve to the client. 1401 */ 1402 private boolean shouldDownloadProfile() { 1403 // Check the argument string that was used to initialize profiling. 1404 // If this indicates hprof and file-based output, then we're ok to 1405 // download. 1406 String profileParams = getProfileParams(); 1407 1408 if (null == profileParams) { 1409 return false; 1410 } 1411 1412 // Split this on whitespace. 1413 String [] parts = profileParams.split("[ \\t]+"); 1414 1415 // If any of these indicate hprof, and the use of output files, return true. 1416 boolean hprofFound = false; 1417 boolean fileFound = false; 1418 for (String p : parts) { 1419 if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) { 1420 hprofFound = true; 1421 1422 // This contains a number of comma-delimited components, one of which 1423 // may specify the file to write to. Make sure this is present and 1424 // not empty. 1425 String [] subparts = p.split(","); 1426 for (String sub : subparts) { 1427 if (sub.startsWith("file=") && sub.length() != "file=".length()) { 1428 fileFound = true; 1429 } 1430 } 1431 } 1432 } 1433 1434 return hprofFound && fileFound; 1435 } 1436 1437 private void printTaskEvents(TaskCompletionEvent[] events, 1438 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges, 1439 IntegerRanges reduceRanges) throws IOException, InterruptedException { 1440 for (TaskCompletionEvent event : events) { 1441 switch (filter) { 1442 case NONE: 1443 break; 1444 case SUCCEEDED: 1445 if (event.getStatus() == 1446 TaskCompletionEvent.Status.SUCCEEDED) { 1447 LOG.info(event.toString()); 1448 } 1449 break; 1450 case FAILED: 1451 if (event.getStatus() == 1452 TaskCompletionEvent.Status.FAILED) { 1453 LOG.info(event.toString()); 1454 // Displaying the task diagnostic information 1455 TaskAttemptID taskId = event.getTaskAttemptId(); 1456 String[] taskDiagnostics = getTaskDiagnostics(taskId); 1457 if (taskDiagnostics != null) { 1458 for (String diagnostics : taskDiagnostics) { 1459 System.err.println(diagnostics); 1460 } 1461 } 1462 } 1463 break; 1464 case KILLED: 1465 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){ 1466 LOG.info(event.toString()); 1467 } 1468 break; 1469 case ALL: 1470 LOG.info(event.toString()); 1471 break; 1472 } 1473 } 1474 } 1475 1476 /** The interval at which monitorAndPrintJob() prints status */ 1477 public static int getProgressPollInterval(Configuration conf) { 1478 // Read progress monitor poll interval from config. Default is 1 second. 1479 int progMonitorPollIntervalMillis = conf.getInt( 1480 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL); 1481 if (progMonitorPollIntervalMillis < 1) { 1482 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 1483 " has been set to an invalid value; " 1484 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL); 1485 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL; 1486 } 1487 return progMonitorPollIntervalMillis; 1488 } 1489 1490 /** The interval at which waitForCompletion() should check. */ 1491 public static int getCompletionPollInterval(Configuration conf) { 1492 int completionPollIntervalMillis = conf.getInt( 1493 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL); 1494 if (completionPollIntervalMillis < 1) { 1495 LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 1496 " has been set to an invalid value; " 1497 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL); 1498 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL; 1499 } 1500 return completionPollIntervalMillis; 1501 } 1502 1503 /** 1504 * Get the task output filter. 1505 * 1506 * @param conf the configuration. 1507 * @return the filter level. 1508 */ 1509 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) { 1510 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED")); 1511 } 1512 1513 /** 1514 * Modify the Configuration to set the task output filter. 1515 * 1516 * @param conf the Configuration to modify. 1517 * @param newValue the value to set. 1518 */ 1519 public static void setTaskOutputFilter(Configuration conf, 1520 TaskStatusFilter newValue) { 1521 conf.set(Job.OUTPUT_FILTER, newValue.toString()); 1522 } 1523 1524 public boolean isUber() throws IOException, InterruptedException { 1525 ensureState(JobState.RUNNING); 1526 updateStatus(); 1527 return status.isUber(); 1528 } 1529 1530 /** 1531 * Get the reservation to which the job is submitted to, if any 1532 * 1533 * @return the reservationId the identifier of the job's reservation, null if 1534 * the job does not have any reservation associated with it 1535 */ 1536 public ReservationId getReservationId() { 1537 return reservationId; 1538 } 1539 1540 /** 1541 * Set the reservation to which the job is submitted to 1542 * 1543 * @param reservationId the reservationId to set 1544 */ 1545 public void setReservationId(ReservationId reservationId) { 1546 this.reservationId = reservationId; 1547 } 1548 1549}