001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.IOException; 022import java.net.URI; 023import java.security.PrivilegedExceptionAction; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.classification.InterfaceAudience.Private; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.conf.Configuration.IntegerRanges; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.RawComparator; 035import org.apache.hadoop.mapred.JobConf; 036import org.apache.hadoop.mapreduce.filecache.DistributedCache; 037import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 038import org.apache.hadoop.mapreduce.task.JobContextImpl; 039import org.apache.hadoop.mapreduce.util.ConfigUtil; 040import org.apache.hadoop.util.StringUtils; 041import org.apache.hadoop.yarn.api.records.ReservationId; 042 043/** 044 * The job submitter's view of the Job. 045 * 046 * <p>It allows the user to configure the 047 * job, submit it, control its execution, and query the state. The set methods 048 * only work until the job is submitted, afterwards they will throw an 049 * IllegalStateException. </p> 050 * 051 * <p> 052 * Normally the user creates the application, describes various facets of the 053 * job via {@link Job} and then submits the job and monitor its progress.</p> 054 * 055 * <p>Here is an example on how to submit a job:</p> 056 * <p><blockquote><pre> 057 * // Create a new Job 058 * Job job = Job.getInstance(); 059 * job.setJarByClass(MyJob.class); 060 * 061 * // Specify various job-specific parameters 062 * job.setJobName("myjob"); 063 * 064 * job.setInputPath(new Path("in")); 065 * job.setOutputPath(new Path("out")); 066 * 067 * job.setMapperClass(MyJob.MyMapper.class); 068 * job.setReducerClass(MyJob.MyReducer.class); 069 * 070 * // Submit the job, then poll for progress until the job is complete 071 * job.waitForCompletion(true); 072 * </pre></blockquote> 073 * 074 * 075 */ 076@InterfaceAudience.Public 077@InterfaceStability.Evolving 078public class Job extends JobContextImpl implements JobContext { 079 private static final Log LOG = LogFactory.getLog(Job.class); 080 081 @InterfaceStability.Evolving 082 public static enum JobState {DEFINE, RUNNING}; 083 private static final long MAX_JOBSTATUS_AGE = 1000 * 2; 084 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter"; 085 /** Key in mapred-*.xml that sets completionPollInvervalMillis */ 086 public static final String COMPLETION_POLL_INTERVAL_KEY = 087 "mapreduce.client.completion.pollinterval"; 088 089 /** Default completionPollIntervalMillis is 5000 ms. */ 090 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000; 091 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */ 092 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY = 093 "mapreduce.client.progressmonitor.pollinterval"; 094 /** Default progMonitorPollIntervalMillis is 1000 ms. */ 095 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; 096 097 public static final String USED_GENERIC_PARSER = 098 "mapreduce.client.genericoptionsparser.used"; 099 public static final String SUBMIT_REPLICATION = 100 "mapreduce.client.submit.file.replication"; 101 public static final int DEFAULT_SUBMIT_REPLICATION = 10; 102 103 @InterfaceStability.Evolving 104 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 105 106 static { 107 ConfigUtil.loadResources(); 108 } 109 110 private JobState state = JobState.DEFINE; 111 private JobStatus status; 112 private long statustime; 113 private Cluster cluster; 114 private ReservationId reservationId; 115 116 /** 117 * @deprecated Use {@link #getInstance()} 118 */ 119 @Deprecated 120 public Job() throws IOException { 121 this(new JobConf(new Configuration())); 122 } 123 124 /** 125 * @deprecated Use {@link #getInstance(Configuration)} 126 */ 127 @Deprecated 128 public Job(Configuration conf) throws IOException { 129 this(new JobConf(conf)); 130 } 131 132 /** 133 * @deprecated Use {@link #getInstance(Configuration, String)} 134 */ 135 @Deprecated 136 public Job(Configuration conf, String jobName) throws IOException { 137 this(new JobConf(conf)); 138 setJobName(jobName); 139 } 140 141 Job(JobConf conf) throws IOException { 142 super(conf, null); 143 // propagate existing user credentials to job 144 this.credentials.mergeAll(this.ugi.getCredentials()); 145 this.cluster = null; 146 } 147 148 Job(JobStatus status, JobConf conf) throws IOException { 149 this(conf); 150 setJobID(status.getJobID()); 151 this.status = status; 152 state = JobState.RUNNING; 153 } 154 155 156 /** 157 * Creates a new {@link Job} with no particular {@link Cluster} . 158 * A Cluster will be created with a generic {@link Configuration}. 159 * 160 * @return the {@link Job} , with no connection to a cluster yet. 161 * @throws IOException 162 */ 163 public static Job getInstance() throws IOException { 164 // create with a null Cluster 165 return getInstance(new Configuration()); 166 } 167 168 /** 169 * Creates a new {@link Job} with no particular {@link Cluster} and a 170 * given {@link Configuration}. 171 * 172 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 173 * that any necessary internal modifications do not reflect on the incoming 174 * parameter. 175 * 176 * A Cluster will be created from the conf parameter only when it's needed. 177 * 178 * @param conf the configuration 179 * @return the {@link Job} , with no connection to a cluster yet. 180 * @throws IOException 181 */ 182 public static Job getInstance(Configuration conf) throws IOException { 183 // create with a null Cluster 184 JobConf jobConf = new JobConf(conf); 185 return new Job(jobConf); 186 } 187 188 189 /** 190 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName. 191 * A Cluster will be created from the conf parameter only when it's needed. 192 * 193 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 194 * that any necessary internal modifications do not reflect on the incoming 195 * parameter. 196 * 197 * @param conf the configuration 198 * @return the {@link Job} , with no connection to a cluster yet. 199 * @throws IOException 200 */ 201 public static Job getInstance(Configuration conf, String jobName) 202 throws IOException { 203 // create with a null Cluster 204 Job result = getInstance(conf); 205 result.setJobName(jobName); 206 return result; 207 } 208 209 /** 210 * Creates a new {@link Job} with no particular {@link Cluster} and given 211 * {@link Configuration} and {@link JobStatus}. 212 * A Cluster will be created from the conf parameter only when it's needed. 213 * 214 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 215 * that any necessary internal modifications do not reflect on the incoming 216 * parameter. 217 * 218 * @param status job status 219 * @param conf job configuration 220 * @return the {@link Job} , with no connection to a cluster yet. 221 * @throws IOException 222 */ 223 public static Job getInstance(JobStatus status, Configuration conf) 224 throws IOException { 225 return new Job(status, new JobConf(conf)); 226 } 227 228 /** 229 * Creates a new {@link Job} with no particular {@link Cluster}. 230 * A Cluster will be created from the conf parameter only when it's needed. 231 * 232 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 233 * that any necessary internal modifications do not reflect on the incoming 234 * parameter. 235 * 236 * @param ignored 237 * @return the {@link Job} , with no connection to a cluster yet. 238 * @throws IOException 239 * @deprecated Use {@link #getInstance()} 240 */ 241 @Deprecated 242 public static Job getInstance(Cluster ignored) throws IOException { 243 return getInstance(); 244 } 245 246 /** 247 * Creates a new {@link Job} with no particular {@link Cluster} and given 248 * {@link Configuration}. 249 * A Cluster will be created from the conf parameter only when it's needed. 250 * 251 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 252 * that any necessary internal modifications do not reflect on the incoming 253 * parameter. 254 * 255 * @param ignored 256 * @param conf job configuration 257 * @return the {@link Job} , with no connection to a cluster yet. 258 * @throws IOException 259 * @deprecated Use {@link #getInstance(Configuration)} 260 */ 261 @Deprecated 262 public static Job getInstance(Cluster ignored, Configuration conf) 263 throws IOException { 264 return getInstance(conf); 265 } 266 267 /** 268 * Creates a new {@link Job} with no particular {@link Cluster} and given 269 * {@link Configuration} and {@link JobStatus}. 270 * A Cluster will be created from the conf parameter only when it's needed. 271 * 272 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 273 * that any necessary internal modifications do not reflect on the incoming 274 * parameter. 275 * 276 * @param cluster cluster 277 * @param status job status 278 * @param conf job configuration 279 * @return the {@link Job} , with no connection to a cluster yet. 280 * @throws IOException 281 */ 282 @Private 283 public static Job getInstance(Cluster cluster, JobStatus status, 284 Configuration conf) throws IOException { 285 Job job = getInstance(status, conf); 286 job.setCluster(cluster); 287 return job; 288 } 289 290 private void ensureState(JobState state) throws IllegalStateException { 291 if (state != this.state) { 292 throw new IllegalStateException("Job in state "+ this.state + 293 " instead of " + state); 294 } 295 296 if (state == JobState.RUNNING && cluster == null) { 297 throw new IllegalStateException 298 ("Job in state " + this.state 299 + ", but it isn't attached to any job tracker!"); 300 } 301 } 302 303 /** 304 * Some methods rely on having a recent job status object. Refresh 305 * it, if necessary 306 */ 307 synchronized void ensureFreshStatus() 308 throws IOException { 309 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) { 310 updateStatus(); 311 } 312 } 313 314 /** Some methods need to update status immediately. So, refresh 315 * immediately 316 * @throws IOException 317 */ 318 synchronized void updateStatus() throws IOException { 319 try { 320 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 321 @Override 322 public JobStatus run() throws IOException, InterruptedException { 323 return cluster.getClient().getJobStatus(status.getJobID()); 324 } 325 }); 326 } 327 catch (InterruptedException ie) { 328 throw new IOException(ie); 329 } 330 if (this.status == null) { 331 throw new IOException("Job status not available "); 332 } 333 this.statustime = System.currentTimeMillis(); 334 } 335 336 public JobStatus getStatus() throws IOException, InterruptedException { 337 ensureState(JobState.RUNNING); 338 updateStatus(); 339 return status; 340 } 341 342 /** 343 * Returns the current state of the Job. 344 * 345 * @return JobStatus#State 346 * @throws IOException 347 * @throws InterruptedException 348 */ 349 public JobStatus.State getJobState() 350 throws IOException, InterruptedException { 351 ensureState(JobState.RUNNING); 352 updateStatus(); 353 return status.getState(); 354 } 355 356 /** 357 * Get the URL where some job progress information will be displayed. 358 * 359 * @return the URL where some job progress information will be displayed. 360 */ 361 public String getTrackingURL(){ 362 ensureState(JobState.RUNNING); 363 return status.getTrackingUrl().toString(); 364 } 365 366 /** 367 * Get the path of the submitted job configuration. 368 * 369 * @return the path of the submitted job configuration. 370 */ 371 public String getJobFile() { 372 ensureState(JobState.RUNNING); 373 return status.getJobFile(); 374 } 375 376 /** 377 * Get start time of the job. 378 * 379 * @return the start time of the job 380 */ 381 public long getStartTime() { 382 ensureState(JobState.RUNNING); 383 return status.getStartTime(); 384 } 385 386 /** 387 * Get finish time of the job. 388 * 389 * @return the finish time of the job 390 */ 391 public long getFinishTime() throws IOException, InterruptedException { 392 ensureState(JobState.RUNNING); 393 updateStatus(); 394 return status.getFinishTime(); 395 } 396 397 /** 398 * Get scheduling info of the job. 399 * 400 * @return the scheduling info of the job 401 */ 402 public String getSchedulingInfo() { 403 ensureState(JobState.RUNNING); 404 return status.getSchedulingInfo(); 405 } 406 407 /** 408 * Get scheduling info of the job. 409 * 410 * @return the scheduling info of the job 411 */ 412 public JobPriority getPriority() throws IOException, InterruptedException { 413 ensureState(JobState.RUNNING); 414 updateStatus(); 415 return status.getPriority(); 416 } 417 418 /** 419 * The user-specified job name. 420 */ 421 public String getJobName() { 422 if (state == JobState.DEFINE) { 423 return super.getJobName(); 424 } 425 ensureState(JobState.RUNNING); 426 return status.getJobName(); 427 } 428 429 public String getHistoryUrl() throws IOException, InterruptedException { 430 ensureState(JobState.RUNNING); 431 updateStatus(); 432 return status.getHistoryFile(); 433 } 434 435 public boolean isRetired() throws IOException, InterruptedException { 436 ensureState(JobState.RUNNING); 437 updateStatus(); 438 return status.isRetired(); 439 } 440 441 @Private 442 public Cluster getCluster() { 443 return cluster; 444 } 445 446 /** Only for mocks in unit tests. */ 447 @Private 448 private void setCluster(Cluster cluster) { 449 this.cluster = cluster; 450 } 451 452 /** 453 * Dump stats to screen. 454 */ 455 @Override 456 public String toString() { 457 ensureState(JobState.RUNNING); 458 String reasonforFailure = " "; 459 int numMaps = 0; 460 int numReduces = 0; 461 try { 462 updateStatus(); 463 if (status.getState().equals(JobStatus.State.FAILED)) 464 reasonforFailure = getTaskFailureEventString(); 465 numMaps = getTaskReports(TaskType.MAP).length; 466 numReduces = getTaskReports(TaskType.REDUCE).length; 467 } catch (IOException e) { 468 } catch (InterruptedException ie) { 469 } 470 StringBuffer sb = new StringBuffer(); 471 sb.append("Job: ").append(status.getJobID()).append("\n"); 472 sb.append("Job File: ").append(status.getJobFile()).append("\n"); 473 sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); 474 sb.append("\n"); 475 sb.append("Uber job : ").append(status.isUber()).append("\n"); 476 sb.append("Number of maps: ").append(numMaps).append("\n"); 477 sb.append("Number of reduces: ").append(numReduces).append("\n"); 478 sb.append("map() completion: "); 479 sb.append(status.getMapProgress()).append("\n"); 480 sb.append("reduce() completion: "); 481 sb.append(status.getReduceProgress()).append("\n"); 482 sb.append("Job state: "); 483 sb.append(status.getState()).append("\n"); 484 sb.append("retired: ").append(status.isRetired()).append("\n"); 485 sb.append("reason for failure: ").append(reasonforFailure); 486 return sb.toString(); 487 } 488 489 /** 490 * @return taskid which caused job failure 491 * @throws IOException 492 * @throws InterruptedException 493 */ 494 String getTaskFailureEventString() throws IOException, 495 InterruptedException { 496 int failCount = 1; 497 TaskCompletionEvent lastEvent = null; 498 TaskCompletionEvent[] events = ugi.doAs(new 499 PrivilegedExceptionAction<TaskCompletionEvent[]>() { 500 @Override 501 public TaskCompletionEvent[] run() throws IOException, 502 InterruptedException { 503 return cluster.getClient().getTaskCompletionEvents( 504 status.getJobID(), 0, 10); 505 } 506 }); 507 for (TaskCompletionEvent event : events) { 508 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) { 509 failCount++; 510 lastEvent = event; 511 } 512 } 513 if (lastEvent == null) { 514 return "There are no failed tasks for the job. " 515 + "Job is failed due to some other reason and reason " 516 + "can be found in the logs."; 517 } 518 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2); 519 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2); 520 return (" task " + taskID + " failed " + 521 failCount + " times " + "For details check tasktracker at: " + 522 lastEvent.getTaskTrackerHttp()); 523 } 524 525 /** 526 * Get the information of the current state of the tasks of a job. 527 * 528 * @param type Type of the task 529 * @return the list of all of the map tips. 530 * @throws IOException 531 */ 532 public TaskReport[] getTaskReports(TaskType type) 533 throws IOException, InterruptedException { 534 ensureState(JobState.RUNNING); 535 final TaskType tmpType = type; 536 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() { 537 public TaskReport[] run() throws IOException, InterruptedException { 538 return cluster.getClient().getTaskReports(getJobID(), tmpType); 539 } 540 }); 541 } 542 543 /** 544 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 545 * and 1.0. When all map tasks have completed, the function returns 1.0. 546 * 547 * @return the progress of the job's map-tasks. 548 * @throws IOException 549 */ 550 public float mapProgress() throws IOException { 551 ensureState(JobState.RUNNING); 552 ensureFreshStatus(); 553 return status.getMapProgress(); 554 } 555 556 /** 557 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 558 * and 1.0. When all reduce tasks have completed, the function returns 1.0. 559 * 560 * @return the progress of the job's reduce-tasks. 561 * @throws IOException 562 */ 563 public float reduceProgress() throws IOException { 564 ensureState(JobState.RUNNING); 565 ensureFreshStatus(); 566 return status.getReduceProgress(); 567 } 568 569 /** 570 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 571 * and 1.0. When all cleanup tasks have completed, the function returns 1.0. 572 * 573 * @return the progress of the job's cleanup-tasks. 574 * @throws IOException 575 */ 576 public float cleanupProgress() throws IOException, InterruptedException { 577 ensureState(JobState.RUNNING); 578 ensureFreshStatus(); 579 return status.getCleanupProgress(); 580 } 581 582 /** 583 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 584 * and 1.0. When all setup tasks have completed, the function returns 1.0. 585 * 586 * @return the progress of the job's setup-tasks. 587 * @throws IOException 588 */ 589 public float setupProgress() throws IOException { 590 ensureState(JobState.RUNNING); 591 ensureFreshStatus(); 592 return status.getSetupProgress(); 593 } 594 595 /** 596 * Check if the job is finished or not. 597 * This is a non-blocking call. 598 * 599 * @return <code>true</code> if the job is complete, else <code>false</code>. 600 * @throws IOException 601 */ 602 public boolean isComplete() throws IOException { 603 ensureState(JobState.RUNNING); 604 updateStatus(); 605 return status.isJobComplete(); 606 } 607 608 /** 609 * Check if the job completed successfully. 610 * 611 * @return <code>true</code> if the job succeeded, else <code>false</code>. 612 * @throws IOException 613 */ 614 public boolean isSuccessful() throws IOException { 615 ensureState(JobState.RUNNING); 616 updateStatus(); 617 return status.getState() == JobStatus.State.SUCCEEDED; 618 } 619 620 /** 621 * Kill the running job. Blocks until all job tasks have been 622 * killed as well. If the job is no longer running, it simply returns. 623 * 624 * @throws IOException 625 */ 626 public void killJob() throws IOException { 627 ensureState(JobState.RUNNING); 628 try { 629 cluster.getClient().killJob(getJobID()); 630 } 631 catch (InterruptedException ie) { 632 throw new IOException(ie); 633 } 634 } 635 636 /** 637 * Set the priority of a running job. 638 * @param priority the new priority for the job. 639 * @throws IOException 640 */ 641 public void setPriority(JobPriority priority) 642 throws IOException, InterruptedException { 643 if (state == JobState.DEFINE) { 644 conf.setJobPriority( 645 org.apache.hadoop.mapred.JobPriority.valueOf(priority.name())); 646 } else { 647 ensureState(JobState.RUNNING); 648 final JobPriority tmpPriority = priority; 649 ugi.doAs(new PrivilegedExceptionAction<Object>() { 650 @Override 651 public Object run() throws IOException, InterruptedException { 652 cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString()); 653 return null; 654 } 655 }); 656 } 657 } 658 659 /** 660 * Get events indicating completion (success/failure) of component tasks. 661 * 662 * @param startFrom index to start fetching events from 663 * @param numEvents number of events to fetch 664 * @return an array of {@link TaskCompletionEvent}s 665 * @throws IOException 666 */ 667 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom, 668 final int numEvents) throws IOException, InterruptedException { 669 ensureState(JobState.RUNNING); 670 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() { 671 @Override 672 public TaskCompletionEvent[] run() throws IOException, InterruptedException { 673 return cluster.getClient().getTaskCompletionEvents(getJobID(), 674 startFrom, numEvents); 675 } 676 }); 677 } 678 679 /** 680 * Get events indicating completion (success/failure) of component tasks. 681 * 682 * @param startFrom index to start fetching events from 683 * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s 684 * @throws IOException 685 */ 686 public org.apache.hadoop.mapred.TaskCompletionEvent[] 687 getTaskCompletionEvents(final int startFrom) throws IOException { 688 try { 689 TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10); 690 org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents = 691 new org.apache.hadoop.mapred.TaskCompletionEvent[events.length]; 692 for (int i = 0; i < events.length; i++) { 693 retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade 694 (events[i]); 695 } 696 return retEvents; 697 } catch (InterruptedException ie) { 698 throw new IOException(ie); 699 } 700 } 701 702 /** 703 * Kill indicated task attempt. 704 * @param taskId the id of the task to kill. 705 * @param shouldFail if <code>true</code> the task is failed and added 706 * to failed tasks list, otherwise it is just killed, 707 * w/o affecting job failure status. 708 */ 709 @Private 710 public boolean killTask(final TaskAttemptID taskId, 711 final boolean shouldFail) throws IOException { 712 ensureState(JobState.RUNNING); 713 try { 714 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { 715 public Boolean run() throws IOException, InterruptedException { 716 return cluster.getClient().killTask(taskId, shouldFail); 717 } 718 }); 719 } 720 catch (InterruptedException ie) { 721 throw new IOException(ie); 722 } 723 } 724 725 /** 726 * Kill indicated task attempt. 727 * 728 * @param taskId the id of the task to be terminated. 729 * @throws IOException 730 */ 731 public void killTask(final TaskAttemptID taskId) 732 throws IOException { 733 killTask(taskId, false); 734 } 735 736 /** 737 * Fail indicated task attempt. 738 * 739 * @param taskId the id of the task to be terminated. 740 * @throws IOException 741 */ 742 public void failTask(final TaskAttemptID taskId) 743 throws IOException { 744 killTask(taskId, true); 745 } 746 747 /** 748 * Gets the counters for this job. May return null if the job has been 749 * retired and the job is no longer in the completed job store. 750 * 751 * @return the counters for this job. 752 * @throws IOException 753 */ 754 public Counters getCounters() 755 throws IOException { 756 ensureState(JobState.RUNNING); 757 try { 758 return ugi.doAs(new PrivilegedExceptionAction<Counters>() { 759 @Override 760 public Counters run() throws IOException, InterruptedException { 761 return cluster.getClient().getJobCounters(getJobID()); 762 } 763 }); 764 } 765 catch (InterruptedException ie) { 766 throw new IOException(ie); 767 } 768 } 769 770 /** 771 * Gets the diagnostic messages for a given task attempt. 772 * @param taskid 773 * @return the list of diagnostic messages for the task 774 * @throws IOException 775 */ 776 public String[] getTaskDiagnostics(final TaskAttemptID taskid) 777 throws IOException, InterruptedException { 778 ensureState(JobState.RUNNING); 779 return ugi.doAs(new PrivilegedExceptionAction<String[]>() { 780 @Override 781 public String[] run() throws IOException, InterruptedException { 782 return cluster.getClient().getTaskDiagnostics(taskid); 783 } 784 }); 785 } 786 787 /** 788 * Set the number of reduce tasks for the job. 789 * @param tasks the number of reduce tasks 790 * @throws IllegalStateException if the job is submitted 791 */ 792 public void setNumReduceTasks(int tasks) throws IllegalStateException { 793 ensureState(JobState.DEFINE); 794 conf.setNumReduceTasks(tasks); 795 } 796 797 /** 798 * Set the current working directory for the default file system. 799 * 800 * @param dir the new current working directory. 801 * @throws IllegalStateException if the job is submitted 802 */ 803 public void setWorkingDirectory(Path dir) throws IOException { 804 ensureState(JobState.DEFINE); 805 conf.setWorkingDirectory(dir); 806 } 807 808 /** 809 * Set the {@link InputFormat} for the job. 810 * @param cls the <code>InputFormat</code> to use 811 * @throws IllegalStateException if the job is submitted 812 */ 813 public void setInputFormatClass(Class<? extends InputFormat> cls 814 ) throws IllegalStateException { 815 ensureState(JobState.DEFINE); 816 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 817 InputFormat.class); 818 } 819 820 /** 821 * Set the {@link OutputFormat} for the job. 822 * @param cls the <code>OutputFormat</code> to use 823 * @throws IllegalStateException if the job is submitted 824 */ 825 public void setOutputFormatClass(Class<? extends OutputFormat> cls 826 ) throws IllegalStateException { 827 ensureState(JobState.DEFINE); 828 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 829 OutputFormat.class); 830 } 831 832 /** 833 * Set the {@link Mapper} for the job. 834 * @param cls the <code>Mapper</code> to use 835 * @throws IllegalStateException if the job is submitted 836 */ 837 public void setMapperClass(Class<? extends Mapper> cls 838 ) throws IllegalStateException { 839 ensureState(JobState.DEFINE); 840 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class); 841 } 842 843 /** 844 * Set the Jar by finding where a given class came from. 845 * @param cls the example class 846 */ 847 public void setJarByClass(Class<?> cls) { 848 ensureState(JobState.DEFINE); 849 conf.setJarByClass(cls); 850 } 851 852 /** 853 * Set the job jar 854 */ 855 public void setJar(String jar) { 856 ensureState(JobState.DEFINE); 857 conf.setJar(jar); 858 } 859 860 /** 861 * Set the reported username for this job. 862 * 863 * @param user the username for this job. 864 */ 865 public void setUser(String user) { 866 ensureState(JobState.DEFINE); 867 conf.setUser(user); 868 } 869 870 /** 871 * Set the combiner class for the job. 872 * @param cls the combiner to use 873 * @throws IllegalStateException if the job is submitted 874 */ 875 public void setCombinerClass(Class<? extends Reducer> cls 876 ) throws IllegalStateException { 877 ensureState(JobState.DEFINE); 878 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); 879 } 880 881 /** 882 * Set the {@link Reducer} for the job. 883 * @param cls the <code>Reducer</code> to use 884 * @throws IllegalStateException if the job is submitted 885 */ 886 public void setReducerClass(Class<? extends Reducer> cls 887 ) throws IllegalStateException { 888 ensureState(JobState.DEFINE); 889 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class); 890 } 891 892 /** 893 * Set the {@link Partitioner} for the job. 894 * @param cls the <code>Partitioner</code> to use 895 * @throws IllegalStateException if the job is submitted 896 */ 897 public void setPartitionerClass(Class<? extends Partitioner> cls 898 ) throws IllegalStateException { 899 ensureState(JobState.DEFINE); 900 conf.setClass(PARTITIONER_CLASS_ATTR, cls, 901 Partitioner.class); 902 } 903 904 /** 905 * Set the key class for the map output data. This allows the user to 906 * specify the map output key class to be different than the final output 907 * value class. 908 * 909 * @param theClass the map output key class. 910 * @throws IllegalStateException if the job is submitted 911 */ 912 public void setMapOutputKeyClass(Class<?> theClass 913 ) throws IllegalStateException { 914 ensureState(JobState.DEFINE); 915 conf.setMapOutputKeyClass(theClass); 916 } 917 918 /** 919 * Set the value class for the map output data. This allows the user to 920 * specify the map output value class to be different than the final output 921 * value class. 922 * 923 * @param theClass the map output value class. 924 * @throws IllegalStateException if the job is submitted 925 */ 926 public void setMapOutputValueClass(Class<?> theClass 927 ) throws IllegalStateException { 928 ensureState(JobState.DEFINE); 929 conf.setMapOutputValueClass(theClass); 930 } 931 932 /** 933 * Set the key class for the job output data. 934 * 935 * @param theClass the key class for the job output data. 936 * @throws IllegalStateException if the job is submitted 937 */ 938 public void setOutputKeyClass(Class<?> theClass 939 ) throws IllegalStateException { 940 ensureState(JobState.DEFINE); 941 conf.setOutputKeyClass(theClass); 942 } 943 944 /** 945 * Set the value class for job outputs. 946 * 947 * @param theClass the value class for job outputs. 948 * @throws IllegalStateException if the job is submitted 949 */ 950 public void setOutputValueClass(Class<?> theClass 951 ) throws IllegalStateException { 952 ensureState(JobState.DEFINE); 953 conf.setOutputValueClass(theClass); 954 } 955 956 /** 957 * Define the comparator that controls which keys are grouped together 958 * for a single call to combiner, 959 * {@link Reducer#reduce(Object, Iterable, 960 * org.apache.hadoop.mapreduce.Reducer.Context)} 961 * 962 * @param cls the raw comparator to use 963 * @throws IllegalStateException if the job is submitted 964 */ 965 public void setCombinerKeyGroupingComparatorClass( 966 Class<? extends RawComparator> cls) throws IllegalStateException { 967 ensureState(JobState.DEFINE); 968 conf.setCombinerKeyGroupingComparator(cls); 969 } 970 971 /** 972 * Define the comparator that controls how the keys are sorted before they 973 * are passed to the {@link Reducer}. 974 * @param cls the raw comparator 975 * @throws IllegalStateException if the job is submitted 976 * @see #setCombinerKeyGroupingComparatorClass(Class) 977 */ 978 public void setSortComparatorClass(Class<? extends RawComparator> cls 979 ) throws IllegalStateException { 980 ensureState(JobState.DEFINE); 981 conf.setOutputKeyComparatorClass(cls); 982 } 983 984 /** 985 * Define the comparator that controls which keys are grouped together 986 * for a single call to 987 * {@link Reducer#reduce(Object, Iterable, 988 * org.apache.hadoop.mapreduce.Reducer.Context)} 989 * @param cls the raw comparator to use 990 * @throws IllegalStateException if the job is submitted 991 * @see #setCombinerKeyGroupingComparatorClass(Class) 992 */ 993 public void setGroupingComparatorClass(Class<? extends RawComparator> cls 994 ) throws IllegalStateException { 995 ensureState(JobState.DEFINE); 996 conf.setOutputValueGroupingComparator(cls); 997 } 998 999 /** 1000 * Set the user-specified job name. 1001 * 1002 * @param name the job's new name. 1003 * @throws IllegalStateException if the job is submitted 1004 */ 1005 public void setJobName(String name) throws IllegalStateException { 1006 ensureState(JobState.DEFINE); 1007 conf.setJobName(name); 1008 } 1009 1010 /** 1011 * Turn speculative execution on or off for this job. 1012 * 1013 * @param speculativeExecution <code>true</code> if speculative execution 1014 * should be turned on, else <code>false</code>. 1015 */ 1016 public void setSpeculativeExecution(boolean speculativeExecution) { 1017 ensureState(JobState.DEFINE); 1018 conf.setSpeculativeExecution(speculativeExecution); 1019 } 1020 1021 /** 1022 * Turn speculative execution on or off for this job for map tasks. 1023 * 1024 * @param speculativeExecution <code>true</code> if speculative execution 1025 * should be turned on for map tasks, 1026 * else <code>false</code>. 1027 */ 1028 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1029 ensureState(JobState.DEFINE); 1030 conf.setMapSpeculativeExecution(speculativeExecution); 1031 } 1032 1033 /** 1034 * Turn speculative execution on or off for this job for reduce tasks. 1035 * 1036 * @param speculativeExecution <code>true</code> if speculative execution 1037 * should be turned on for reduce tasks, 1038 * else <code>false</code>. 1039 */ 1040 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1041 ensureState(JobState.DEFINE); 1042 conf.setReduceSpeculativeExecution(speculativeExecution); 1043 } 1044 1045 /** 1046 * Specify whether job-setup and job-cleanup is needed for the job 1047 * 1048 * @param needed If <code>true</code>, job-setup and job-cleanup will be 1049 * considered from {@link OutputCommitter} 1050 * else ignored. 1051 */ 1052 public void setJobSetupCleanupNeeded(boolean needed) { 1053 ensureState(JobState.DEFINE); 1054 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed); 1055 } 1056 1057 /** 1058 * Set the given set of archives 1059 * @param archives The list of archives that need to be localized 1060 */ 1061 public void setCacheArchives(URI[] archives) { 1062 ensureState(JobState.DEFINE); 1063 DistributedCache.setCacheArchives(archives, conf); 1064 } 1065 1066 /** 1067 * Set the given set of files 1068 * @param files The list of files that need to be localized 1069 */ 1070 public void setCacheFiles(URI[] files) { 1071 ensureState(JobState.DEFINE); 1072 DistributedCache.setCacheFiles(files, conf); 1073 } 1074 1075 /** 1076 * Add a archives to be localized 1077 * @param uri The uri of the cache to be localized 1078 */ 1079 public void addCacheArchive(URI uri) { 1080 ensureState(JobState.DEFINE); 1081 DistributedCache.addCacheArchive(uri, conf); 1082 } 1083 1084 /** 1085 * Add a file to be localized 1086 * @param uri The uri of the cache to be localized 1087 */ 1088 public void addCacheFile(URI uri) { 1089 ensureState(JobState.DEFINE); 1090 DistributedCache.addCacheFile(uri, conf); 1091 } 1092 1093 /** 1094 * Add an file path to the current set of classpath entries It adds the file 1095 * to cache as well. 1096 * 1097 * Files added with this method will not be unpacked while being added to the 1098 * classpath. 1099 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)} 1100 * method instead. 1101 * 1102 * @param file Path of the file to be added 1103 */ 1104 public void addFileToClassPath(Path file) 1105 throws IOException { 1106 ensureState(JobState.DEFINE); 1107 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf)); 1108 } 1109 1110 /** 1111 * Add an archive path to the current set of classpath entries. It adds the 1112 * archive to cache as well. 1113 * 1114 * Archive files will be unpacked and added to the classpath 1115 * when being distributed. 1116 * 1117 * @param archive Path of the archive to be added 1118 */ 1119 public void addArchiveToClassPath(Path archive) 1120 throws IOException { 1121 ensureState(JobState.DEFINE); 1122 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); 1123 } 1124 1125 /** 1126 * Originally intended to enable symlinks, but currently symlinks cannot be 1127 * disabled. 1128 */ 1129 @Deprecated 1130 public void createSymlink() { 1131 ensureState(JobState.DEFINE); 1132 DistributedCache.createSymlink(conf); 1133 } 1134 1135 /** 1136 * Expert: Set the number of maximum attempts that will be made to run a 1137 * map task. 1138 * 1139 * @param n the number of attempts per map task. 1140 */ 1141 public void setMaxMapAttempts(int n) { 1142 ensureState(JobState.DEFINE); 1143 conf.setMaxMapAttempts(n); 1144 } 1145 1146 /** 1147 * Expert: Set the number of maximum attempts that will be made to run a 1148 * reduce task. 1149 * 1150 * @param n the number of attempts per reduce task. 1151 */ 1152 public void setMaxReduceAttempts(int n) { 1153 ensureState(JobState.DEFINE); 1154 conf.setMaxReduceAttempts(n); 1155 } 1156 1157 /** 1158 * Set whether the system should collect profiler information for some of 1159 * the tasks in this job? The information is stored in the user log 1160 * directory. 1161 * @param newValue true means it should be gathered 1162 */ 1163 public void setProfileEnabled(boolean newValue) { 1164 ensureState(JobState.DEFINE); 1165 conf.setProfileEnabled(newValue); 1166 } 1167 1168 /** 1169 * Set the profiler configuration arguments. If the string contains a '%s' it 1170 * will be replaced with the name of the profiling output file when the task 1171 * runs. 1172 * 1173 * This value is passed to the task child JVM on the command line. 1174 * 1175 * @param value the configuration string 1176 */ 1177 public void setProfileParams(String value) { 1178 ensureState(JobState.DEFINE); 1179 conf.setProfileParams(value); 1180 } 1181 1182 /** 1183 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1184 * must also be called. 1185 * @param newValue a set of integer ranges of the map ids 1186 */ 1187 public void setProfileTaskRange(boolean isMap, String newValue) { 1188 ensureState(JobState.DEFINE); 1189 conf.setProfileTaskRange(isMap, newValue); 1190 } 1191 1192 private void ensureNotSet(String attr, String msg) throws IOException { 1193 if (conf.get(attr) != null) { 1194 throw new IOException(attr + " is incompatible with " + msg + " mode."); 1195 } 1196 } 1197 1198 /** 1199 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation 1200 * tokens upon job completion. Defaults to true. 1201 */ 1202 public void setCancelDelegationTokenUponJobCompletion(boolean value) { 1203 ensureState(JobState.DEFINE); 1204 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value); 1205 } 1206 1207 /** 1208 * Default to the new APIs unless they are explicitly set or the old mapper or 1209 * reduce attributes are used. 1210 * @throws IOException if the configuration is inconsistant 1211 */ 1212 private void setUseNewAPI() throws IOException { 1213 int numReduces = conf.getNumReduceTasks(); 1214 String oldMapperClass = "mapred.mapper.class"; 1215 String oldReduceClass = "mapred.reducer.class"; 1216 conf.setBooleanIfUnset("mapred.mapper.new-api", 1217 conf.get(oldMapperClass) == null); 1218 if (conf.getUseNewMapper()) { 1219 String mode = "new map API"; 1220 ensureNotSet("mapred.input.format.class", mode); 1221 ensureNotSet(oldMapperClass, mode); 1222 if (numReduces != 0) { 1223 ensureNotSet("mapred.partitioner.class", mode); 1224 } else { 1225 ensureNotSet("mapred.output.format.class", mode); 1226 } 1227 } else { 1228 String mode = "map compatability"; 1229 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode); 1230 ensureNotSet(MAP_CLASS_ATTR, mode); 1231 if (numReduces != 0) { 1232 ensureNotSet(PARTITIONER_CLASS_ATTR, mode); 1233 } else { 1234 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1235 } 1236 } 1237 if (numReduces != 0) { 1238 conf.setBooleanIfUnset("mapred.reducer.new-api", 1239 conf.get(oldReduceClass) == null); 1240 if (conf.getUseNewReducer()) { 1241 String mode = "new reduce API"; 1242 ensureNotSet("mapred.output.format.class", mode); 1243 ensureNotSet(oldReduceClass, mode); 1244 } else { 1245 String mode = "reduce compatability"; 1246 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1247 ensureNotSet(REDUCE_CLASS_ATTR, mode); 1248 } 1249 } 1250 } 1251 1252 private synchronized void connect() 1253 throws IOException, InterruptedException, ClassNotFoundException { 1254 if (cluster == null) { 1255 cluster = 1256 ugi.doAs(new PrivilegedExceptionAction<Cluster>() { 1257 public Cluster run() 1258 throws IOException, InterruptedException, 1259 ClassNotFoundException { 1260 return new Cluster(getConfiguration()); 1261 } 1262 }); 1263 } 1264 } 1265 1266 boolean isConnected() { 1267 return cluster != null; 1268 } 1269 1270 /** Only for mocking via unit tests. */ 1271 @Private 1272 public JobSubmitter getJobSubmitter(FileSystem fs, 1273 ClientProtocol submitClient) throws IOException { 1274 return new JobSubmitter(fs, submitClient); 1275 } 1276 /** 1277 * Submit the job to the cluster and return immediately. 1278 * @throws IOException 1279 */ 1280 public void submit() 1281 throws IOException, InterruptedException, ClassNotFoundException { 1282 ensureState(JobState.DEFINE); 1283 setUseNewAPI(); 1284 connect(); 1285 final JobSubmitter submitter = 1286 getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); 1287 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 1288 public JobStatus run() throws IOException, InterruptedException, 1289 ClassNotFoundException { 1290 return submitter.submitJobInternal(Job.this, cluster); 1291 } 1292 }); 1293 state = JobState.RUNNING; 1294 LOG.info("The url to track the job: " + getTrackingURL()); 1295 } 1296 1297 /** 1298 * Submit the job to the cluster and wait for it to finish. 1299 * @param verbose print the progress to the user 1300 * @return true if the job succeeded 1301 * @throws IOException thrown if the communication with the 1302 * <code>JobTracker</code> is lost 1303 */ 1304 public boolean waitForCompletion(boolean verbose 1305 ) throws IOException, InterruptedException, 1306 ClassNotFoundException { 1307 if (state == JobState.DEFINE) { 1308 submit(); 1309 } 1310 if (verbose) { 1311 monitorAndPrintJob(); 1312 } else { 1313 // get the completion poll interval from the client. 1314 int completionPollIntervalMillis = 1315 Job.getCompletionPollInterval(cluster.getConf()); 1316 while (!isComplete()) { 1317 try { 1318 Thread.sleep(completionPollIntervalMillis); 1319 } catch (InterruptedException ie) { 1320 } 1321 } 1322 } 1323 return isSuccessful(); 1324 } 1325 1326 /** 1327 * Monitor a job and print status in real-time as progress is made and tasks 1328 * fail. 1329 * @return true if the job succeeded 1330 * @throws IOException if communication to the JobTracker fails 1331 */ 1332 public boolean monitorAndPrintJob() 1333 throws IOException, InterruptedException { 1334 String lastReport = null; 1335 Job.TaskStatusFilter filter; 1336 Configuration clientConf = getConfiguration(); 1337 filter = Job.getTaskOutputFilter(clientConf); 1338 JobID jobId = getJobID(); 1339 LOG.info("Running job: " + jobId); 1340 int eventCounter = 0; 1341 boolean profiling = getProfileEnabled(); 1342 IntegerRanges mapRanges = getProfileTaskRange(true); 1343 IntegerRanges reduceRanges = getProfileTaskRange(false); 1344 int progMonitorPollIntervalMillis = 1345 Job.getProgressPollInterval(clientConf); 1346 /* make sure to report full progress after the job is done */ 1347 boolean reportedAfterCompletion = false; 1348 boolean reportedUberMode = false; 1349 while (!isComplete() || !reportedAfterCompletion) { 1350 if (isComplete()) { 1351 reportedAfterCompletion = true; 1352 } else { 1353 Thread.sleep(progMonitorPollIntervalMillis); 1354 } 1355 if (status.getState() == JobStatus.State.PREP) { 1356 continue; 1357 } 1358 if (!reportedUberMode) { 1359 reportedUberMode = true; 1360 LOG.info("Job " + jobId + " running in uber mode : " + isUber()); 1361 } 1362 String report = 1363 (" map " + StringUtils.formatPercent(mapProgress(), 0)+ 1364 " reduce " + 1365 StringUtils.formatPercent(reduceProgress(), 0)); 1366 if (!report.equals(lastReport)) { 1367 LOG.info(report); 1368 lastReport = report; 1369 } 1370 1371 TaskCompletionEvent[] events = 1372 getTaskCompletionEvents(eventCounter, 10); 1373 eventCounter += events.length; 1374 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges); 1375 } 1376 boolean success = isSuccessful(); 1377 if (success) { 1378 LOG.info("Job " + jobId + " completed successfully"); 1379 } else { 1380 LOG.info("Job " + jobId + " failed with state " + status.getState() + 1381 " due to: " + status.getFailureInfo()); 1382 } 1383 Counters counters = getCounters(); 1384 if (counters != null) { 1385 LOG.info(counters.toString()); 1386 } 1387 return success; 1388 } 1389 1390 private void printTaskEvents(TaskCompletionEvent[] events, 1391 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges, 1392 IntegerRanges reduceRanges) throws IOException, InterruptedException { 1393 for (TaskCompletionEvent event : events) { 1394 switch (filter) { 1395 case NONE: 1396 break; 1397 case SUCCEEDED: 1398 if (event.getStatus() == 1399 TaskCompletionEvent.Status.SUCCEEDED) { 1400 LOG.info(event.toString()); 1401 } 1402 break; 1403 case FAILED: 1404 if (event.getStatus() == 1405 TaskCompletionEvent.Status.FAILED) { 1406 LOG.info(event.toString()); 1407 // Displaying the task diagnostic information 1408 TaskAttemptID taskId = event.getTaskAttemptId(); 1409 String[] taskDiagnostics = getTaskDiagnostics(taskId); 1410 if (taskDiagnostics != null) { 1411 for (String diagnostics : taskDiagnostics) { 1412 System.err.println(diagnostics); 1413 } 1414 } 1415 } 1416 break; 1417 case KILLED: 1418 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){ 1419 LOG.info(event.toString()); 1420 } 1421 break; 1422 case ALL: 1423 LOG.info(event.toString()); 1424 break; 1425 } 1426 } 1427 } 1428 1429 /** The interval at which monitorAndPrintJob() prints status */ 1430 public static int getProgressPollInterval(Configuration conf) { 1431 // Read progress monitor poll interval from config. Default is 1 second. 1432 int progMonitorPollIntervalMillis = conf.getInt( 1433 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL); 1434 if (progMonitorPollIntervalMillis < 1) { 1435 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 1436 " has been set to an invalid value; " 1437 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL); 1438 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL; 1439 } 1440 return progMonitorPollIntervalMillis; 1441 } 1442 1443 /** The interval at which waitForCompletion() should check. */ 1444 public static int getCompletionPollInterval(Configuration conf) { 1445 int completionPollIntervalMillis = conf.getInt( 1446 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL); 1447 if (completionPollIntervalMillis < 1) { 1448 LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 1449 " has been set to an invalid value; " 1450 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL); 1451 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL; 1452 } 1453 return completionPollIntervalMillis; 1454 } 1455 1456 /** 1457 * Get the task output filter. 1458 * 1459 * @param conf the configuration. 1460 * @return the filter level. 1461 */ 1462 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) { 1463 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED")); 1464 } 1465 1466 /** 1467 * Modify the Configuration to set the task output filter. 1468 * 1469 * @param conf the Configuration to modify. 1470 * @param newValue the value to set. 1471 */ 1472 public static void setTaskOutputFilter(Configuration conf, 1473 TaskStatusFilter newValue) { 1474 conf.set(Job.OUTPUT_FILTER, newValue.toString()); 1475 } 1476 1477 public boolean isUber() throws IOException, InterruptedException { 1478 ensureState(JobState.RUNNING); 1479 updateStatus(); 1480 return status.isUber(); 1481 } 1482 1483 /** 1484 * Get the reservation to which the job is submitted to, if any 1485 * 1486 * @return the reservationId the identifier of the job's reservation, null if 1487 * the job does not have any reservation associated with it 1488 */ 1489 public ReservationId getReservationId() { 1490 return reservationId; 1491 } 1492 1493 /** 1494 * Set the reservation to which the job is submitted to 1495 * 1496 * @param reservationId the reservationId to set 1497 */ 1498 public void setReservationId(ReservationId reservationId) { 1499 this.reservationId = reservationId; 1500 } 1501 1502}