001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce; 020 021 import java.io.IOException; 022 import java.net.URI; 023 import java.security.PrivilegedExceptionAction; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceStability; 029 import org.apache.hadoop.classification.InterfaceAudience.Private; 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.conf.Configuration.IntegerRanges; 032 import org.apache.hadoop.fs.FileSystem; 033 import org.apache.hadoop.fs.Path; 034 import org.apache.hadoop.io.RawComparator; 035 import org.apache.hadoop.mapred.JobConf; 036 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 037 import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 038 import org.apache.hadoop.mapreduce.task.JobContextImpl; 039 import org.apache.hadoop.mapreduce.util.ConfigUtil; 040 import org.apache.hadoop.util.StringUtils; 041 042 /** 043 * The job submitter's view of the Job. 044 * 045 * <p>It allows the user to configure the 046 * job, submit it, control its execution, and query the state. The set methods 047 * only work until the job is submitted, afterwards they will throw an 048 * IllegalStateException. </p> 049 * 050 * <p> 051 * Normally the user creates the application, describes various facets of the 052 * job via {@link Job} and then submits the job and monitor its progress.</p> 053 * 054 * <p>Here is an example on how to submit a job:</p> 055 * <p><blockquote><pre> 056 * // Create a new Job 057 * Job job = new Job(new Configuration()); 058 * job.setJarByClass(MyJob.class); 059 * 060 * // Specify various job-specific parameters 061 * job.setJobName("myjob"); 062 * 063 * job.setInputPath(new Path("in")); 064 * job.setOutputPath(new Path("out")); 065 * 066 * job.setMapperClass(MyJob.MyMapper.class); 067 * job.setReducerClass(MyJob.MyReducer.class); 068 * 069 * // Submit the job, then poll for progress until the job is complete 070 * job.waitForCompletion(true); 071 * </pre></blockquote></p> 072 * 073 * 074 */ 075 @InterfaceAudience.Public 076 @InterfaceStability.Evolving 077 public class Job extends JobContextImpl implements JobContext { 078 private static final Log LOG = LogFactory.getLog(Job.class); 079 080 @InterfaceStability.Evolving 081 public static enum JobState {DEFINE, RUNNING}; 082 private static final long MAX_JOBSTATUS_AGE = 1000 * 2; 083 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter"; 084 /** Key in mapred-*.xml that sets completionPollInvervalMillis */ 085 public static final String COMPLETION_POLL_INTERVAL_KEY = 086 "mapreduce.client.completion.pollinterval"; 087 088 /** Default completionPollIntervalMillis is 5000 ms. */ 089 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000; 090 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */ 091 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY = 092 "mapreduce.client.progressmonitor.pollinterval"; 093 /** Default progMonitorPollIntervalMillis is 1000 ms. */ 094 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; 095 096 public static final String USED_GENERIC_PARSER = 097 "mapreduce.client.genericoptionsparser.used"; 098 public static final String SUBMIT_REPLICATION = 099 "mapreduce.client.submit.file.replication"; 100 private static final String TASKLOG_PULL_TIMEOUT_KEY = 101 "mapreduce.client.tasklog.timeout"; 102 private static final int DEFAULT_TASKLOG_TIMEOUT = 60000; 103 104 @InterfaceStability.Evolving 105 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 106 107 static { 108 ConfigUtil.loadResources(); 109 } 110 111 private JobState state = JobState.DEFINE; 112 private JobStatus status; 113 private long statustime; 114 private Cluster cluster; 115 116 @Deprecated 117 public Job() throws IOException { 118 this(new Configuration()); 119 } 120 121 @Deprecated 122 public Job(Configuration conf) throws IOException { 123 this(new JobConf(conf)); 124 } 125 126 @Deprecated 127 public Job(Configuration conf, String jobName) throws IOException { 128 this(conf); 129 setJobName(jobName); 130 } 131 132 Job(JobConf conf) throws IOException { 133 super(conf, null); 134 // propagate existing user credentials to job 135 this.credentials.mergeAll(this.ugi.getCredentials()); 136 this.cluster = null; 137 } 138 139 Job(JobStatus status, JobConf conf) throws IOException { 140 this(conf); 141 setJobID(status.getJobID()); 142 this.status = status; 143 state = JobState.RUNNING; 144 } 145 146 147 /** 148 * Creates a new {@link Job} with no particular {@link Cluster} . 149 * A Cluster will be created with a generic {@link Configuration}. 150 * 151 * @return the {@link Job} , with no connection to a cluster yet. 152 * @throws IOException 153 */ 154 public static Job getInstance() throws IOException { 155 // create with a null Cluster 156 return getInstance(new Configuration()); 157 } 158 159 /** 160 * Creates a new {@link Job} with no particular {@link Cluster} and a 161 * given {@link Configuration}. 162 * 163 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 164 * that any necessary internal modifications do not reflect on the incoming 165 * parameter. 166 * 167 * A Cluster will be created from the conf parameter only when it's needed. 168 * 169 * @param conf the configuration 170 * @return the {@link Job} , with no connection to a cluster yet. 171 * @throws IOException 172 */ 173 public static Job getInstance(Configuration conf) throws IOException { 174 // create with a null Cluster 175 JobConf jobConf = new JobConf(conf); 176 return new Job(jobConf); 177 } 178 179 180 /** 181 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName. 182 * A Cluster will be created from the conf parameter only when it's needed. 183 * 184 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 185 * that any necessary internal modifications do not reflect on the incoming 186 * parameter. 187 * 188 * @param conf the configuration 189 * @return the {@link Job} , with no connection to a cluster yet. 190 * @throws IOException 191 */ 192 public static Job getInstance(Configuration conf, String jobName) 193 throws IOException { 194 // create with a null Cluster 195 Job result = getInstance(conf); 196 result.setJobName(jobName); 197 return result; 198 } 199 200 /** 201 * Creates a new {@link Job} with no particular {@link Cluster} and given 202 * {@link Configuration} and {@link JobStatus}. 203 * A Cluster will be created from the conf parameter only when it's needed. 204 * 205 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 206 * that any necessary internal modifications do not reflect on the incoming 207 * parameter. 208 * 209 * @param status job status 210 * @param conf job configuration 211 * @return the {@link Job} , with no connection to a cluster yet. 212 * @throws IOException 213 */ 214 public static Job getInstance(JobStatus status, Configuration conf) 215 throws IOException { 216 return new Job(status, new JobConf(conf)); 217 } 218 219 /** 220 * Creates a new {@link Job} with no particular {@link Cluster}. 221 * A Cluster will be created from the conf parameter only when it's needed. 222 * 223 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 224 * that any necessary internal modifications do not reflect on the incoming 225 * parameter. 226 * 227 * @param ignored 228 * @return the {@link Job} , with no connection to a cluster yet. 229 * @throws IOException 230 * @deprecated Use {@link #getInstance()} 231 */ 232 @Deprecated 233 public static Job getInstance(Cluster ignored) throws IOException { 234 return getInstance(); 235 } 236 237 /** 238 * Creates a new {@link Job} with no particular {@link Cluster} and given 239 * {@link Configuration}. 240 * A Cluster will be created from the conf parameter only when it's needed. 241 * 242 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 243 * that any necessary internal modifications do not reflect on the incoming 244 * parameter. 245 * 246 * @param ignored 247 * @param conf job configuration 248 * @return the {@link Job} , with no connection to a cluster yet. 249 * @throws IOException 250 * @deprecated Use {@link #getInstance(Configuration)} 251 */ 252 @Deprecated 253 public static Job getInstance(Cluster ignored, Configuration conf) 254 throws IOException { 255 return getInstance(conf); 256 } 257 258 /** 259 * Creates a new {@link Job} with no particular {@link Cluster} and given 260 * {@link Configuration} and {@link JobStatus}. 261 * A Cluster will be created from the conf parameter only when it's needed. 262 * 263 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 264 * that any necessary internal modifications do not reflect on the incoming 265 * parameter. 266 * 267 * @param cluster cluster 268 * @param status job status 269 * @param conf job configuration 270 * @return the {@link Job} , with no connection to a cluster yet. 271 * @throws IOException 272 */ 273 @Private 274 public static Job getInstance(Cluster cluster, JobStatus status, 275 Configuration conf) throws IOException { 276 Job job = getInstance(status, conf); 277 job.setCluster(cluster); 278 return job; 279 } 280 281 private void ensureState(JobState state) throws IllegalStateException { 282 if (state != this.state) { 283 throw new IllegalStateException("Job in state "+ this.state + 284 " instead of " + state); 285 } 286 287 if (state == JobState.RUNNING && cluster == null) { 288 throw new IllegalStateException 289 ("Job in state " + this.state 290 + ", but it isn't attached to any job tracker!"); 291 } 292 } 293 294 /** 295 * Some methods rely on having a recent job status object. Refresh 296 * it, if necessary 297 */ 298 synchronized void ensureFreshStatus() 299 throws IOException { 300 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) { 301 updateStatus(); 302 } 303 } 304 305 /** Some methods need to update status immediately. So, refresh 306 * immediately 307 * @throws IOException 308 */ 309 synchronized void updateStatus() throws IOException { 310 try { 311 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 312 @Override 313 public JobStatus run() throws IOException, InterruptedException { 314 return cluster.getClient().getJobStatus(status.getJobID()); 315 } 316 }); 317 } 318 catch (InterruptedException ie) { 319 throw new IOException(ie); 320 } 321 if (this.status == null) { 322 throw new IOException("Job status not available "); 323 } 324 this.statustime = System.currentTimeMillis(); 325 } 326 327 public JobStatus getStatus() throws IOException, InterruptedException { 328 ensureState(JobState.RUNNING); 329 updateStatus(); 330 return status; 331 } 332 333 private void setStatus(JobStatus status) { 334 this.status = status; 335 } 336 337 /** 338 * Returns the current state of the Job. 339 * 340 * @return JobStatus#State 341 * @throws IOException 342 * @throws InterruptedException 343 */ 344 public JobStatus.State getJobState() 345 throws IOException, InterruptedException { 346 ensureState(JobState.RUNNING); 347 updateStatus(); 348 return status.getState(); 349 } 350 351 /** 352 * Get the URL where some job progress information will be displayed. 353 * 354 * @return the URL where some job progress information will be displayed. 355 */ 356 public String getTrackingURL(){ 357 ensureState(JobState.RUNNING); 358 return status.getTrackingUrl().toString(); 359 } 360 361 /** 362 * Get the path of the submitted job configuration. 363 * 364 * @return the path of the submitted job configuration. 365 */ 366 public String getJobFile() { 367 ensureState(JobState.RUNNING); 368 return status.getJobFile(); 369 } 370 371 /** 372 * Get start time of the job. 373 * 374 * @return the start time of the job 375 */ 376 public long getStartTime() { 377 ensureState(JobState.RUNNING); 378 return status.getStartTime(); 379 } 380 381 /** 382 * Get finish time of the job. 383 * 384 * @return the finish time of the job 385 */ 386 public long getFinishTime() throws IOException, InterruptedException { 387 ensureState(JobState.RUNNING); 388 updateStatus(); 389 return status.getFinishTime(); 390 } 391 392 /** 393 * Get scheduling info of the job. 394 * 395 * @return the scheduling info of the job 396 */ 397 public String getSchedulingInfo() { 398 ensureState(JobState.RUNNING); 399 return status.getSchedulingInfo(); 400 } 401 402 /** 403 * Get scheduling info of the job. 404 * 405 * @return the scheduling info of the job 406 */ 407 public JobPriority getPriority() throws IOException, InterruptedException { 408 ensureState(JobState.RUNNING); 409 updateStatus(); 410 return status.getPriority(); 411 } 412 413 /** 414 * The user-specified job name. 415 */ 416 public String getJobName() { 417 if (state == JobState.DEFINE) { 418 return super.getJobName(); 419 } 420 ensureState(JobState.RUNNING); 421 return status.getJobName(); 422 } 423 424 public String getHistoryUrl() throws IOException, InterruptedException { 425 ensureState(JobState.RUNNING); 426 updateStatus(); 427 return status.getHistoryFile(); 428 } 429 430 public boolean isRetired() throws IOException, InterruptedException { 431 ensureState(JobState.RUNNING); 432 updateStatus(); 433 return status.isRetired(); 434 } 435 436 @Private 437 public Cluster getCluster() { 438 return cluster; 439 } 440 441 /** Only for mocks in unit tests. */ 442 @Private 443 private void setCluster(Cluster cluster) { 444 this.cluster = cluster; 445 } 446 447 /** 448 * Dump stats to screen. 449 */ 450 @Override 451 public String toString() { 452 ensureState(JobState.RUNNING); 453 String reasonforFailure = " "; 454 int numMaps = 0; 455 int numReduces = 0; 456 try { 457 updateStatus(); 458 if (status.getState().equals(JobStatus.State.FAILED)) 459 reasonforFailure = getTaskFailureEventString(); 460 numMaps = getTaskReports(TaskType.MAP).length; 461 numReduces = getTaskReports(TaskType.REDUCE).length; 462 } catch (IOException e) { 463 } catch (InterruptedException ie) { 464 } 465 StringBuffer sb = new StringBuffer(); 466 sb.append("Job: ").append(status.getJobID()).append("\n"); 467 sb.append("Job File: ").append(status.getJobFile()).append("\n"); 468 sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); 469 sb.append("\n"); 470 sb.append("Uber job : ").append(status.isUber()).append("\n"); 471 sb.append("Number of maps: ").append(numMaps).append("\n"); 472 sb.append("Number of reduces: ").append(numReduces).append("\n"); 473 sb.append("map() completion: "); 474 sb.append(status.getMapProgress()).append("\n"); 475 sb.append("reduce() completion: "); 476 sb.append(status.getReduceProgress()).append("\n"); 477 sb.append("Job state: "); 478 sb.append(status.getState()).append("\n"); 479 sb.append("retired: ").append(status.isRetired()).append("\n"); 480 sb.append("reason for failure: ").append(reasonforFailure); 481 return sb.toString(); 482 } 483 484 /** 485 * @return taskid which caused job failure 486 * @throws IOException 487 * @throws InterruptedException 488 */ 489 String getTaskFailureEventString() throws IOException, 490 InterruptedException { 491 int failCount = 1; 492 TaskCompletionEvent lastEvent = null; 493 TaskCompletionEvent[] events = ugi.doAs(new 494 PrivilegedExceptionAction<TaskCompletionEvent[]>() { 495 @Override 496 public TaskCompletionEvent[] run() throws IOException, 497 InterruptedException { 498 return cluster.getClient().getTaskCompletionEvents( 499 status.getJobID(), 0, 10); 500 } 501 }); 502 for (TaskCompletionEvent event : events) { 503 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) { 504 failCount++; 505 lastEvent = event; 506 } 507 } 508 if (lastEvent == null) { 509 return "There are no failed tasks for the job. " 510 + "Job is failed due to some other reason and reason " 511 + "can be found in the logs."; 512 } 513 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2); 514 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2); 515 return (" task " + taskID + " failed " + 516 failCount + " times " + "For details check tasktracker at: " + 517 lastEvent.getTaskTrackerHttp()); 518 } 519 520 /** 521 * Get the information of the current state of the tasks of a job. 522 * 523 * @param type Type of the task 524 * @return the list of all of the map tips. 525 * @throws IOException 526 */ 527 public TaskReport[] getTaskReports(TaskType type) 528 throws IOException, InterruptedException { 529 ensureState(JobState.RUNNING); 530 final TaskType tmpType = type; 531 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() { 532 public TaskReport[] run() throws IOException, InterruptedException { 533 return cluster.getClient().getTaskReports(getJobID(), tmpType); 534 } 535 }); 536 } 537 538 /** 539 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 540 * and 1.0. When all map tasks have completed, the function returns 1.0. 541 * 542 * @return the progress of the job's map-tasks. 543 * @throws IOException 544 */ 545 public float mapProgress() throws IOException { 546 ensureState(JobState.RUNNING); 547 ensureFreshStatus(); 548 return status.getMapProgress(); 549 } 550 551 /** 552 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 553 * and 1.0. When all reduce tasks have completed, the function returns 1.0. 554 * 555 * @return the progress of the job's reduce-tasks. 556 * @throws IOException 557 */ 558 public float reduceProgress() throws IOException { 559 ensureState(JobState.RUNNING); 560 ensureFreshStatus(); 561 return status.getReduceProgress(); 562 } 563 564 /** 565 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 566 * and 1.0. When all cleanup tasks have completed, the function returns 1.0. 567 * 568 * @return the progress of the job's cleanup-tasks. 569 * @throws IOException 570 */ 571 public float cleanupProgress() throws IOException, InterruptedException { 572 ensureState(JobState.RUNNING); 573 ensureFreshStatus(); 574 return status.getCleanupProgress(); 575 } 576 577 /** 578 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 579 * and 1.0. When all setup tasks have completed, the function returns 1.0. 580 * 581 * @return the progress of the job's setup-tasks. 582 * @throws IOException 583 */ 584 public float setupProgress() throws IOException { 585 ensureState(JobState.RUNNING); 586 ensureFreshStatus(); 587 return status.getSetupProgress(); 588 } 589 590 /** 591 * Check if the job is finished or not. 592 * This is a non-blocking call. 593 * 594 * @return <code>true</code> if the job is complete, else <code>false</code>. 595 * @throws IOException 596 */ 597 public boolean isComplete() throws IOException { 598 ensureState(JobState.RUNNING); 599 updateStatus(); 600 return status.isJobComplete(); 601 } 602 603 /** 604 * Check if the job completed successfully. 605 * 606 * @return <code>true</code> if the job succeeded, else <code>false</code>. 607 * @throws IOException 608 */ 609 public boolean isSuccessful() throws IOException { 610 ensureState(JobState.RUNNING); 611 updateStatus(); 612 return status.getState() == JobStatus.State.SUCCEEDED; 613 } 614 615 /** 616 * Kill the running job. Blocks until all job tasks have been 617 * killed as well. If the job is no longer running, it simply returns. 618 * 619 * @throws IOException 620 */ 621 public void killJob() throws IOException { 622 ensureState(JobState.RUNNING); 623 try { 624 cluster.getClient().killJob(getJobID()); 625 } 626 catch (InterruptedException ie) { 627 throw new IOException(ie); 628 } 629 } 630 631 /** 632 * Set the priority of a running job. 633 * @param priority the new priority for the job. 634 * @throws IOException 635 */ 636 public void setPriority(JobPriority priority) 637 throws IOException, InterruptedException { 638 if (state == JobState.DEFINE) { 639 conf.setJobPriority( 640 org.apache.hadoop.mapred.JobPriority.valueOf(priority.name())); 641 } else { 642 ensureState(JobState.RUNNING); 643 final JobPriority tmpPriority = priority; 644 ugi.doAs(new PrivilegedExceptionAction<Object>() { 645 @Override 646 public Object run() throws IOException, InterruptedException { 647 cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString()); 648 return null; 649 } 650 }); 651 } 652 } 653 654 /** 655 * Get events indicating completion (success/failure) of component tasks. 656 * 657 * @param startFrom index to start fetching events from 658 * @param numEvents number of events to fetch 659 * @return an array of {@link TaskCompletionEvent}s 660 * @throws IOException 661 */ 662 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom, 663 final int numEvents) throws IOException, InterruptedException { 664 ensureState(JobState.RUNNING); 665 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() { 666 @Override 667 public TaskCompletionEvent[] run() throws IOException, InterruptedException { 668 return cluster.getClient().getTaskCompletionEvents(getJobID(), 669 startFrom, numEvents); 670 } 671 }); 672 } 673 674 /** 675 * Get events indicating completion (success/failure) of component tasks. 676 * 677 * @param startFrom index to start fetching events from 678 * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s 679 * @throws IOException 680 */ 681 public org.apache.hadoop.mapred.TaskCompletionEvent[] 682 getTaskCompletionEvents(final int startFrom) throws IOException { 683 try { 684 TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10); 685 org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents = 686 new org.apache.hadoop.mapred.TaskCompletionEvent[events.length]; 687 for (int i = 0; i < events.length; i++) { 688 retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade 689 (events[i]); 690 } 691 return retEvents; 692 } catch (InterruptedException ie) { 693 throw new IOException(ie); 694 } 695 } 696 697 /** 698 * Kill indicated task attempt. 699 * @param taskId the id of the task to kill. 700 * @param shouldFail if <code>true</code> the task is failed and added 701 * to failed tasks list, otherwise it is just killed, 702 * w/o affecting job failure status. 703 */ 704 @Private 705 public boolean killTask(final TaskAttemptID taskId, 706 final boolean shouldFail) throws IOException { 707 ensureState(JobState.RUNNING); 708 try { 709 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { 710 public Boolean run() throws IOException, InterruptedException { 711 return cluster.getClient().killTask(taskId, shouldFail); 712 } 713 }); 714 } 715 catch (InterruptedException ie) { 716 throw new IOException(ie); 717 } 718 } 719 720 /** 721 * Kill indicated task attempt. 722 * 723 * @param taskId the id of the task to be terminated. 724 * @throws IOException 725 */ 726 public void killTask(final TaskAttemptID taskId) 727 throws IOException { 728 killTask(taskId, false); 729 } 730 731 /** 732 * Fail indicated task attempt. 733 * 734 * @param taskId the id of the task to be terminated. 735 * @throws IOException 736 */ 737 public void failTask(final TaskAttemptID taskId) 738 throws IOException { 739 killTask(taskId, true); 740 } 741 742 /** 743 * Gets the counters for this job. May return null if the job has been 744 * retired and the job is no longer in the completed job store. 745 * 746 * @return the counters for this job. 747 * @throws IOException 748 */ 749 public Counters getCounters() 750 throws IOException { 751 ensureState(JobState.RUNNING); 752 try { 753 return ugi.doAs(new PrivilegedExceptionAction<Counters>() { 754 @Override 755 public Counters run() throws IOException, InterruptedException { 756 return cluster.getClient().getJobCounters(getJobID()); 757 } 758 }); 759 } 760 catch (InterruptedException ie) { 761 throw new IOException(ie); 762 } 763 } 764 765 /** 766 * Gets the diagnostic messages for a given task attempt. 767 * @param taskid 768 * @return the list of diagnostic messages for the task 769 * @throws IOException 770 */ 771 public String[] getTaskDiagnostics(final TaskAttemptID taskid) 772 throws IOException, InterruptedException { 773 ensureState(JobState.RUNNING); 774 return ugi.doAs(new PrivilegedExceptionAction<String[]>() { 775 @Override 776 public String[] run() throws IOException, InterruptedException { 777 return cluster.getClient().getTaskDiagnostics(taskid); 778 } 779 }); 780 } 781 782 /** 783 * Set the number of reduce tasks for the job. 784 * @param tasks the number of reduce tasks 785 * @throws IllegalStateException if the job is submitted 786 */ 787 public void setNumReduceTasks(int tasks) throws IllegalStateException { 788 ensureState(JobState.DEFINE); 789 conf.setNumReduceTasks(tasks); 790 } 791 792 /** 793 * Set the current working directory for the default file system. 794 * 795 * @param dir the new current working directory. 796 * @throws IllegalStateException if the job is submitted 797 */ 798 public void setWorkingDirectory(Path dir) throws IOException { 799 ensureState(JobState.DEFINE); 800 conf.setWorkingDirectory(dir); 801 } 802 803 /** 804 * Set the {@link InputFormat} for the job. 805 * @param cls the <code>InputFormat</code> to use 806 * @throws IllegalStateException if the job is submitted 807 */ 808 public void setInputFormatClass(Class<? extends InputFormat> cls 809 ) throws IllegalStateException { 810 ensureState(JobState.DEFINE); 811 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 812 InputFormat.class); 813 } 814 815 /** 816 * Set the {@link OutputFormat} for the job. 817 * @param cls the <code>OutputFormat</code> to use 818 * @throws IllegalStateException if the job is submitted 819 */ 820 public void setOutputFormatClass(Class<? extends OutputFormat> cls 821 ) throws IllegalStateException { 822 ensureState(JobState.DEFINE); 823 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 824 OutputFormat.class); 825 } 826 827 /** 828 * Set the {@link Mapper} for the job. 829 * @param cls the <code>Mapper</code> to use 830 * @throws IllegalStateException if the job is submitted 831 */ 832 public void setMapperClass(Class<? extends Mapper> cls 833 ) throws IllegalStateException { 834 ensureState(JobState.DEFINE); 835 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class); 836 } 837 838 /** 839 * Set the Jar by finding where a given class came from. 840 * @param cls the example class 841 */ 842 public void setJarByClass(Class<?> cls) { 843 ensureState(JobState.DEFINE); 844 conf.setJarByClass(cls); 845 } 846 847 /** 848 * Set the job jar 849 */ 850 public void setJar(String jar) { 851 ensureState(JobState.DEFINE); 852 conf.setJar(jar); 853 } 854 855 /** 856 * Set the reported username for this job. 857 * 858 * @param user the username for this job. 859 */ 860 public void setUser(String user) { 861 ensureState(JobState.DEFINE); 862 conf.setUser(user); 863 } 864 865 /** 866 * Set the combiner class for the job. 867 * @param cls the combiner to use 868 * @throws IllegalStateException if the job is submitted 869 */ 870 public void setCombinerClass(Class<? extends Reducer> cls 871 ) throws IllegalStateException { 872 ensureState(JobState.DEFINE); 873 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); 874 } 875 876 /** 877 * Set the {@link Reducer} for the job. 878 * @param cls the <code>Reducer</code> to use 879 * @throws IllegalStateException if the job is submitted 880 */ 881 public void setReducerClass(Class<? extends Reducer> cls 882 ) throws IllegalStateException { 883 ensureState(JobState.DEFINE); 884 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class); 885 } 886 887 /** 888 * Set the {@link Partitioner} for the job. 889 * @param cls the <code>Partitioner</code> to use 890 * @throws IllegalStateException if the job is submitted 891 */ 892 public void setPartitionerClass(Class<? extends Partitioner> cls 893 ) throws IllegalStateException { 894 ensureState(JobState.DEFINE); 895 conf.setClass(PARTITIONER_CLASS_ATTR, cls, 896 Partitioner.class); 897 } 898 899 /** 900 * Set the key class for the map output data. This allows the user to 901 * specify the map output key class to be different than the final output 902 * value class. 903 * 904 * @param theClass the map output key class. 905 * @throws IllegalStateException if the job is submitted 906 */ 907 public void setMapOutputKeyClass(Class<?> theClass 908 ) throws IllegalStateException { 909 ensureState(JobState.DEFINE); 910 conf.setMapOutputKeyClass(theClass); 911 } 912 913 /** 914 * Set the value class for the map output data. This allows the user to 915 * specify the map output value class to be different than the final output 916 * value class. 917 * 918 * @param theClass the map output value class. 919 * @throws IllegalStateException if the job is submitted 920 */ 921 public void setMapOutputValueClass(Class<?> theClass 922 ) throws IllegalStateException { 923 ensureState(JobState.DEFINE); 924 conf.setMapOutputValueClass(theClass); 925 } 926 927 /** 928 * Set the key class for the job output data. 929 * 930 * @param theClass the key class for the job output data. 931 * @throws IllegalStateException if the job is submitted 932 */ 933 public void setOutputKeyClass(Class<?> theClass 934 ) throws IllegalStateException { 935 ensureState(JobState.DEFINE); 936 conf.setOutputKeyClass(theClass); 937 } 938 939 /** 940 * Set the value class for job outputs. 941 * 942 * @param theClass the value class for job outputs. 943 * @throws IllegalStateException if the job is submitted 944 */ 945 public void setOutputValueClass(Class<?> theClass 946 ) throws IllegalStateException { 947 ensureState(JobState.DEFINE); 948 conf.setOutputValueClass(theClass); 949 } 950 951 /** 952 * Define the comparator that controls which keys are grouped together 953 * for a single call to combiner, 954 * {@link Reducer#reduce(Object, Iterable, 955 * org.apache.hadoop.mapreduce.Reducer.Context)} 956 * 957 * @param cls the raw comparator to use 958 * @throws IllegalStateException if the job is submitted 959 */ 960 public void setCombinerKeyGroupingComparatorClass( 961 Class<? extends RawComparator> cls) throws IllegalStateException { 962 ensureState(JobState.DEFINE); 963 conf.setCombinerKeyGroupingComparator(cls); 964 } 965 966 /** 967 * Define the comparator that controls how the keys are sorted before they 968 * are passed to the {@link Reducer}. 969 * @param cls the raw comparator 970 * @throws IllegalStateException if the job is submitted 971 * @see #setCombinerKeyGroupingComparatorClass(Class) 972 */ 973 public void setSortComparatorClass(Class<? extends RawComparator> cls 974 ) throws IllegalStateException { 975 ensureState(JobState.DEFINE); 976 conf.setOutputKeyComparatorClass(cls); 977 } 978 979 /** 980 * Define the comparator that controls which keys are grouped together 981 * for a single call to 982 * {@link Reducer#reduce(Object, Iterable, 983 * org.apache.hadoop.mapreduce.Reducer.Context)} 984 * @param cls the raw comparator to use 985 * @throws IllegalStateException if the job is submitted 986 * @see #setCombinerKeyGroupingComparatorClass(Class) 987 */ 988 public void setGroupingComparatorClass(Class<? extends RawComparator> cls 989 ) throws IllegalStateException { 990 ensureState(JobState.DEFINE); 991 conf.setOutputValueGroupingComparator(cls); 992 } 993 994 /** 995 * Set the user-specified job name. 996 * 997 * @param name the job's new name. 998 * @throws IllegalStateException if the job is submitted 999 */ 1000 public void setJobName(String name) throws IllegalStateException { 1001 ensureState(JobState.DEFINE); 1002 conf.setJobName(name); 1003 } 1004 1005 /** 1006 * Turn speculative execution on or off for this job. 1007 * 1008 * @param speculativeExecution <code>true</code> if speculative execution 1009 * should be turned on, else <code>false</code>. 1010 */ 1011 public void setSpeculativeExecution(boolean speculativeExecution) { 1012 ensureState(JobState.DEFINE); 1013 conf.setSpeculativeExecution(speculativeExecution); 1014 } 1015 1016 /** 1017 * Turn speculative execution on or off for this job for map tasks. 1018 * 1019 * @param speculativeExecution <code>true</code> if speculative execution 1020 * should be turned on for map tasks, 1021 * else <code>false</code>. 1022 */ 1023 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1024 ensureState(JobState.DEFINE); 1025 conf.setMapSpeculativeExecution(speculativeExecution); 1026 } 1027 1028 /** 1029 * Turn speculative execution on or off for this job for reduce tasks. 1030 * 1031 * @param speculativeExecution <code>true</code> if speculative execution 1032 * should be turned on for reduce tasks, 1033 * else <code>false</code>. 1034 */ 1035 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1036 ensureState(JobState.DEFINE); 1037 conf.setReduceSpeculativeExecution(speculativeExecution); 1038 } 1039 1040 /** 1041 * Specify whether job-setup and job-cleanup is needed for the job 1042 * 1043 * @param needed If <code>true</code>, job-setup and job-cleanup will be 1044 * considered from {@link OutputCommitter} 1045 * else ignored. 1046 */ 1047 public void setJobSetupCleanupNeeded(boolean needed) { 1048 ensureState(JobState.DEFINE); 1049 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed); 1050 } 1051 1052 /** 1053 * Set the given set of archives 1054 * @param archives The list of archives that need to be localized 1055 */ 1056 public void setCacheArchives(URI[] archives) { 1057 ensureState(JobState.DEFINE); 1058 DistributedCache.setCacheArchives(archives, conf); 1059 } 1060 1061 /** 1062 * Set the given set of files 1063 * @param files The list of files that need to be localized 1064 */ 1065 public void setCacheFiles(URI[] files) { 1066 ensureState(JobState.DEFINE); 1067 DistributedCache.setCacheFiles(files, conf); 1068 } 1069 1070 /** 1071 * Add a archives to be localized 1072 * @param uri The uri of the cache to be localized 1073 */ 1074 public void addCacheArchive(URI uri) { 1075 ensureState(JobState.DEFINE); 1076 DistributedCache.addCacheArchive(uri, conf); 1077 } 1078 1079 /** 1080 * Add a file to be localized 1081 * @param uri The uri of the cache to be localized 1082 */ 1083 public void addCacheFile(URI uri) { 1084 ensureState(JobState.DEFINE); 1085 DistributedCache.addCacheFile(uri, conf); 1086 } 1087 1088 /** 1089 * Add an file path to the current set of classpath entries It adds the file 1090 * to cache as well. 1091 * 1092 * Files added with this method will not be unpacked while being added to the 1093 * classpath. 1094 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)} 1095 * method instead. 1096 * 1097 * @param file Path of the file to be added 1098 */ 1099 public void addFileToClassPath(Path file) 1100 throws IOException { 1101 ensureState(JobState.DEFINE); 1102 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf)); 1103 } 1104 1105 /** 1106 * Add an archive path to the current set of classpath entries. It adds the 1107 * archive to cache as well. 1108 * 1109 * Archive files will be unpacked and added to the classpath 1110 * when being distributed. 1111 * 1112 * @param archive Path of the archive to be added 1113 */ 1114 public void addArchiveToClassPath(Path archive) 1115 throws IOException { 1116 ensureState(JobState.DEFINE); 1117 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); 1118 } 1119 1120 /** 1121 * Originally intended to enable symlinks, but currently symlinks cannot be 1122 * disabled. 1123 */ 1124 @Deprecated 1125 public void createSymlink() { 1126 ensureState(JobState.DEFINE); 1127 DistributedCache.createSymlink(conf); 1128 } 1129 1130 /** 1131 * Expert: Set the number of maximum attempts that will be made to run a 1132 * map task. 1133 * 1134 * @param n the number of attempts per map task. 1135 */ 1136 public void setMaxMapAttempts(int n) { 1137 ensureState(JobState.DEFINE); 1138 conf.setMaxMapAttempts(n); 1139 } 1140 1141 /** 1142 * Expert: Set the number of maximum attempts that will be made to run a 1143 * reduce task. 1144 * 1145 * @param n the number of attempts per reduce task. 1146 */ 1147 public void setMaxReduceAttempts(int n) { 1148 ensureState(JobState.DEFINE); 1149 conf.setMaxReduceAttempts(n); 1150 } 1151 1152 /** 1153 * Set whether the system should collect profiler information for some of 1154 * the tasks in this job? The information is stored in the user log 1155 * directory. 1156 * @param newValue true means it should be gathered 1157 */ 1158 public void setProfileEnabled(boolean newValue) { 1159 ensureState(JobState.DEFINE); 1160 conf.setProfileEnabled(newValue); 1161 } 1162 1163 /** 1164 * Set the profiler configuration arguments. If the string contains a '%s' it 1165 * will be replaced with the name of the profiling output file when the task 1166 * runs. 1167 * 1168 * This value is passed to the task child JVM on the command line. 1169 * 1170 * @param value the configuration string 1171 */ 1172 public void setProfileParams(String value) { 1173 ensureState(JobState.DEFINE); 1174 conf.setProfileParams(value); 1175 } 1176 1177 /** 1178 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1179 * must also be called. 1180 * @param newValue a set of integer ranges of the map ids 1181 */ 1182 public void setProfileTaskRange(boolean isMap, String newValue) { 1183 ensureState(JobState.DEFINE); 1184 conf.setProfileTaskRange(isMap, newValue); 1185 } 1186 1187 private void ensureNotSet(String attr, String msg) throws IOException { 1188 if (conf.get(attr) != null) { 1189 throw new IOException(attr + " is incompatible with " + msg + " mode."); 1190 } 1191 } 1192 1193 /** 1194 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation 1195 * tokens upon job completion. Defaults to true. 1196 */ 1197 public void setCancelDelegationTokenUponJobCompletion(boolean value) { 1198 ensureState(JobState.DEFINE); 1199 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value); 1200 } 1201 1202 /** 1203 * Default to the new APIs unless they are explicitly set or the old mapper or 1204 * reduce attributes are used. 1205 * @throws IOException if the configuration is inconsistant 1206 */ 1207 private void setUseNewAPI() throws IOException { 1208 int numReduces = conf.getNumReduceTasks(); 1209 String oldMapperClass = "mapred.mapper.class"; 1210 String oldReduceClass = "mapred.reducer.class"; 1211 conf.setBooleanIfUnset("mapred.mapper.new-api", 1212 conf.get(oldMapperClass) == null); 1213 if (conf.getUseNewMapper()) { 1214 String mode = "new map API"; 1215 ensureNotSet("mapred.input.format.class", mode); 1216 ensureNotSet(oldMapperClass, mode); 1217 if (numReduces != 0) { 1218 ensureNotSet("mapred.partitioner.class", mode); 1219 } else { 1220 ensureNotSet("mapred.output.format.class", mode); 1221 } 1222 } else { 1223 String mode = "map compatability"; 1224 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode); 1225 ensureNotSet(MAP_CLASS_ATTR, mode); 1226 if (numReduces != 0) { 1227 ensureNotSet(PARTITIONER_CLASS_ATTR, mode); 1228 } else { 1229 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1230 } 1231 } 1232 if (numReduces != 0) { 1233 conf.setBooleanIfUnset("mapred.reducer.new-api", 1234 conf.get(oldReduceClass) == null); 1235 if (conf.getUseNewReducer()) { 1236 String mode = "new reduce API"; 1237 ensureNotSet("mapred.output.format.class", mode); 1238 ensureNotSet(oldReduceClass, mode); 1239 } else { 1240 String mode = "reduce compatability"; 1241 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1242 ensureNotSet(REDUCE_CLASS_ATTR, mode); 1243 } 1244 } 1245 } 1246 1247 private synchronized void connect() 1248 throws IOException, InterruptedException, ClassNotFoundException { 1249 if (cluster == null) { 1250 cluster = 1251 ugi.doAs(new PrivilegedExceptionAction<Cluster>() { 1252 public Cluster run() 1253 throws IOException, InterruptedException, 1254 ClassNotFoundException { 1255 return new Cluster(getConfiguration()); 1256 } 1257 }); 1258 } 1259 } 1260 1261 boolean isConnected() { 1262 return cluster != null; 1263 } 1264 1265 /** Only for mocking via unit tests. */ 1266 @Private 1267 public JobSubmitter getJobSubmitter(FileSystem fs, 1268 ClientProtocol submitClient) throws IOException { 1269 return new JobSubmitter(fs, submitClient); 1270 } 1271 /** 1272 * Submit the job to the cluster and return immediately. 1273 * @throws IOException 1274 */ 1275 public void submit() 1276 throws IOException, InterruptedException, ClassNotFoundException { 1277 ensureState(JobState.DEFINE); 1278 setUseNewAPI(); 1279 connect(); 1280 final JobSubmitter submitter = 1281 getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); 1282 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 1283 public JobStatus run() throws IOException, InterruptedException, 1284 ClassNotFoundException { 1285 return submitter.submitJobInternal(Job.this, cluster); 1286 } 1287 }); 1288 state = JobState.RUNNING; 1289 LOG.info("The url to track the job: " + getTrackingURL()); 1290 } 1291 1292 /** 1293 * Submit the job to the cluster and wait for it to finish. 1294 * @param verbose print the progress to the user 1295 * @return true if the job succeeded 1296 * @throws IOException thrown if the communication with the 1297 * <code>JobTracker</code> is lost 1298 */ 1299 public boolean waitForCompletion(boolean verbose 1300 ) throws IOException, InterruptedException, 1301 ClassNotFoundException { 1302 if (state == JobState.DEFINE) { 1303 submit(); 1304 } 1305 if (verbose) { 1306 monitorAndPrintJob(); 1307 } else { 1308 // get the completion poll interval from the client. 1309 int completionPollIntervalMillis = 1310 Job.getCompletionPollInterval(cluster.getConf()); 1311 while (!isComplete()) { 1312 try { 1313 Thread.sleep(completionPollIntervalMillis); 1314 } catch (InterruptedException ie) { 1315 } 1316 } 1317 } 1318 return isSuccessful(); 1319 } 1320 1321 /** 1322 * Monitor a job and print status in real-time as progress is made and tasks 1323 * fail. 1324 * @return true if the job succeeded 1325 * @throws IOException if communication to the JobTracker fails 1326 */ 1327 public boolean monitorAndPrintJob() 1328 throws IOException, InterruptedException { 1329 String lastReport = null; 1330 Job.TaskStatusFilter filter; 1331 Configuration clientConf = getConfiguration(); 1332 filter = Job.getTaskOutputFilter(clientConf); 1333 JobID jobId = getJobID(); 1334 LOG.info("Running job: " + jobId); 1335 int eventCounter = 0; 1336 boolean profiling = getProfileEnabled(); 1337 IntegerRanges mapRanges = getProfileTaskRange(true); 1338 IntegerRanges reduceRanges = getProfileTaskRange(false); 1339 int progMonitorPollIntervalMillis = 1340 Job.getProgressPollInterval(clientConf); 1341 /* make sure to report full progress after the job is done */ 1342 boolean reportedAfterCompletion = false; 1343 boolean reportedUberMode = false; 1344 while (!isComplete() || !reportedAfterCompletion) { 1345 if (isComplete()) { 1346 reportedAfterCompletion = true; 1347 } else { 1348 Thread.sleep(progMonitorPollIntervalMillis); 1349 } 1350 if (status.getState() == JobStatus.State.PREP) { 1351 continue; 1352 } 1353 if (!reportedUberMode) { 1354 reportedUberMode = true; 1355 LOG.info("Job " + jobId + " running in uber mode : " + isUber()); 1356 } 1357 String report = 1358 (" map " + StringUtils.formatPercent(mapProgress(), 0)+ 1359 " reduce " + 1360 StringUtils.formatPercent(reduceProgress(), 0)); 1361 if (!report.equals(lastReport)) { 1362 LOG.info(report); 1363 lastReport = report; 1364 } 1365 1366 TaskCompletionEvent[] events = 1367 getTaskCompletionEvents(eventCounter, 10); 1368 eventCounter += events.length; 1369 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges); 1370 } 1371 boolean success = isSuccessful(); 1372 if (success) { 1373 LOG.info("Job " + jobId + " completed successfully"); 1374 } else { 1375 LOG.info("Job " + jobId + " failed with state " + status.getState() + 1376 " due to: " + status.getFailureInfo()); 1377 } 1378 Counters counters = getCounters(); 1379 if (counters != null) { 1380 LOG.info(counters.toString()); 1381 } 1382 return success; 1383 } 1384 1385 /** 1386 * @return true if the profile parameters indicate that this is using 1387 * hprof, which generates profile files in a particular location 1388 * that we can retrieve to the client. 1389 */ 1390 private boolean shouldDownloadProfile() { 1391 // Check the argument string that was used to initialize profiling. 1392 // If this indicates hprof and file-based output, then we're ok to 1393 // download. 1394 String profileParams = getProfileParams(); 1395 1396 if (null == profileParams) { 1397 return false; 1398 } 1399 1400 // Split this on whitespace. 1401 String [] parts = profileParams.split("[ \\t]+"); 1402 1403 // If any of these indicate hprof, and the use of output files, return true. 1404 boolean hprofFound = false; 1405 boolean fileFound = false; 1406 for (String p : parts) { 1407 if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) { 1408 hprofFound = true; 1409 1410 // This contains a number of comma-delimited components, one of which 1411 // may specify the file to write to. Make sure this is present and 1412 // not empty. 1413 String [] subparts = p.split(","); 1414 for (String sub : subparts) { 1415 if (sub.startsWith("file=") && sub.length() != "file=".length()) { 1416 fileFound = true; 1417 } 1418 } 1419 } 1420 } 1421 1422 return hprofFound && fileFound; 1423 } 1424 1425 private void printTaskEvents(TaskCompletionEvent[] events, 1426 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges, 1427 IntegerRanges reduceRanges) throws IOException, InterruptedException { 1428 for (TaskCompletionEvent event : events) { 1429 switch (filter) { 1430 case NONE: 1431 break; 1432 case SUCCEEDED: 1433 if (event.getStatus() == 1434 TaskCompletionEvent.Status.SUCCEEDED) { 1435 LOG.info(event.toString()); 1436 } 1437 break; 1438 case FAILED: 1439 if (event.getStatus() == 1440 TaskCompletionEvent.Status.FAILED) { 1441 LOG.info(event.toString()); 1442 // Displaying the task diagnostic information 1443 TaskAttemptID taskId = event.getTaskAttemptId(); 1444 String[] taskDiagnostics = getTaskDiagnostics(taskId); 1445 if (taskDiagnostics != null) { 1446 for (String diagnostics : taskDiagnostics) { 1447 System.err.println(diagnostics); 1448 } 1449 } 1450 } 1451 break; 1452 case KILLED: 1453 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){ 1454 LOG.info(event.toString()); 1455 } 1456 break; 1457 case ALL: 1458 LOG.info(event.toString()); 1459 break; 1460 } 1461 } 1462 } 1463 1464 /** The interval at which monitorAndPrintJob() prints status */ 1465 public static int getProgressPollInterval(Configuration conf) { 1466 // Read progress monitor poll interval from config. Default is 1 second. 1467 int progMonitorPollIntervalMillis = conf.getInt( 1468 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL); 1469 if (progMonitorPollIntervalMillis < 1) { 1470 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 1471 " has been set to an invalid value; " 1472 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL); 1473 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL; 1474 } 1475 return progMonitorPollIntervalMillis; 1476 } 1477 1478 /** The interval at which waitForCompletion() should check. */ 1479 public static int getCompletionPollInterval(Configuration conf) { 1480 int completionPollIntervalMillis = conf.getInt( 1481 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL); 1482 if (completionPollIntervalMillis < 1) { 1483 LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 1484 " has been set to an invalid value; " 1485 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL); 1486 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL; 1487 } 1488 return completionPollIntervalMillis; 1489 } 1490 1491 /** 1492 * Get the task output filter. 1493 * 1494 * @param conf the configuration. 1495 * @return the filter level. 1496 */ 1497 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) { 1498 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED")); 1499 } 1500 1501 /** 1502 * Modify the Configuration to set the task output filter. 1503 * 1504 * @param conf the Configuration to modify. 1505 * @param newValue the value to set. 1506 */ 1507 public static void setTaskOutputFilter(Configuration conf, 1508 TaskStatusFilter newValue) { 1509 conf.set(Job.OUTPUT_FILTER, newValue.toString()); 1510 } 1511 1512 public boolean isUber() throws IOException, InterruptedException { 1513 ensureState(JobState.RUNNING); 1514 updateStatus(); 1515 return status.isUber(); 1516 } 1517 1518 }