001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.IOException; 022import java.net.URI; 023import java.security.PrivilegedExceptionAction; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.classification.InterfaceAudience.Private; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.conf.Configuration.IntegerRanges; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.RawComparator; 035import org.apache.hadoop.mapred.JobConf; 036import org.apache.hadoop.mapreduce.filecache.DistributedCache; 037import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 038import org.apache.hadoop.mapreduce.task.JobContextImpl; 039import org.apache.hadoop.mapreduce.util.ConfigUtil; 040import org.apache.hadoop.util.StringUtils; 041 042/** 043 * The job submitter's view of the Job. 044 * 045 * <p>It allows the user to configure the 046 * job, submit it, control its execution, and query the state. The set methods 047 * only work until the job is submitted, afterwards they will throw an 048 * IllegalStateException. </p> 049 * 050 * <p> 051 * Normally the user creates the application, describes various facets of the 052 * job via {@link Job} and then submits the job and monitor its progress.</p> 053 * 054 * <p>Here is an example on how to submit a job:</p> 055 * <p><blockquote><pre> 056 * // Create a new Job 057 * Job job = new Job(new Configuration()); 058 * job.setJarByClass(MyJob.class); 059 * 060 * // Specify various job-specific parameters 061 * job.setJobName("myjob"); 062 * 063 * job.setInputPath(new Path("in")); 064 * job.setOutputPath(new Path("out")); 065 * 066 * job.setMapperClass(MyJob.MyMapper.class); 067 * job.setReducerClass(MyJob.MyReducer.class); 068 * 069 * // Submit the job, then poll for progress until the job is complete 070 * job.waitForCompletion(true); 071 * </pre></blockquote></p> 072 * 073 * 074 */ 075@InterfaceAudience.Public 076@InterfaceStability.Evolving 077public class Job extends JobContextImpl implements JobContext { 078 private static final Log LOG = LogFactory.getLog(Job.class); 079 080 @InterfaceStability.Evolving 081 public static enum JobState {DEFINE, RUNNING}; 082 private static final long MAX_JOBSTATUS_AGE = 1000 * 2; 083 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter"; 084 /** Key in mapred-*.xml that sets completionPollInvervalMillis */ 085 public static final String COMPLETION_POLL_INTERVAL_KEY = 086 "mapreduce.client.completion.pollinterval"; 087 088 /** Default completionPollIntervalMillis is 5000 ms. */ 089 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000; 090 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */ 091 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY = 092 "mapreduce.client.progressmonitor.pollinterval"; 093 /** Default progMonitorPollIntervalMillis is 1000 ms. */ 094 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; 095 096 public static final String USED_GENERIC_PARSER = 097 "mapreduce.client.genericoptionsparser.used"; 098 public static final String SUBMIT_REPLICATION = 099 "mapreduce.client.submit.file.replication"; 100 private static final String TASKLOG_PULL_TIMEOUT_KEY = 101 "mapreduce.client.tasklog.timeout"; 102 private static final int DEFAULT_TASKLOG_TIMEOUT = 60000; 103 104 @InterfaceStability.Evolving 105 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 106 107 static { 108 ConfigUtil.loadResources(); 109 } 110 111 private JobState state = JobState.DEFINE; 112 private JobStatus status; 113 private long statustime; 114 private Cluster cluster; 115 116 @Deprecated 117 public Job() throws IOException { 118 this(new Configuration()); 119 } 120 121 @Deprecated 122 public Job(Configuration conf) throws IOException { 123 this(new JobConf(conf)); 124 } 125 126 @Deprecated 127 public Job(Configuration conf, String jobName) throws IOException { 128 this(conf); 129 setJobName(jobName); 130 } 131 132 Job(JobConf conf) throws IOException { 133 super(conf, null); 134 // propagate existing user credentials to job 135 this.credentials.mergeAll(this.ugi.getCredentials()); 136 this.cluster = null; 137 } 138 139 Job(JobStatus status, JobConf conf) throws IOException { 140 this(conf); 141 setJobID(status.getJobID()); 142 this.status = status; 143 state = JobState.RUNNING; 144 } 145 146 147 /** 148 * Creates a new {@link Job} with no particular {@link Cluster} . 149 * A Cluster will be created with a generic {@link Configuration}. 150 * 151 * @return the {@link Job} , with no connection to a cluster yet. 152 * @throws IOException 153 */ 154 public static Job getInstance() throws IOException { 155 // create with a null Cluster 156 return getInstance(new Configuration()); 157 } 158 159 /** 160 * Creates a new {@link Job} with no particular {@link Cluster} and a 161 * given {@link Configuration}. 162 * 163 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 164 * that any necessary internal modifications do not reflect on the incoming 165 * parameter. 166 * 167 * A Cluster will be created from the conf parameter only when it's needed. 168 * 169 * @param conf the configuration 170 * @return the {@link Job} , with no connection to a cluster yet. 171 * @throws IOException 172 */ 173 public static Job getInstance(Configuration conf) throws IOException { 174 // create with a null Cluster 175 JobConf jobConf = new JobConf(conf); 176 return new Job(jobConf); 177 } 178 179 180 /** 181 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName. 182 * A Cluster will be created from the conf parameter only when it's needed. 183 * 184 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 185 * that any necessary internal modifications do not reflect on the incoming 186 * parameter. 187 * 188 * @param conf the configuration 189 * @return the {@link Job} , with no connection to a cluster yet. 190 * @throws IOException 191 */ 192 public static Job getInstance(Configuration conf, String jobName) 193 throws IOException { 194 // create with a null Cluster 195 Job result = getInstance(conf); 196 result.setJobName(jobName); 197 return result; 198 } 199 200 /** 201 * Creates a new {@link Job} with no particular {@link Cluster} and given 202 * {@link Configuration} and {@link JobStatus}. 203 * A Cluster will be created from the conf parameter only when it's needed. 204 * 205 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 206 * that any necessary internal modifications do not reflect on the incoming 207 * parameter. 208 * 209 * @param status job status 210 * @param conf job configuration 211 * @return the {@link Job} , with no connection to a cluster yet. 212 * @throws IOException 213 */ 214 public static Job getInstance(JobStatus status, Configuration conf) 215 throws IOException { 216 return new Job(status, new JobConf(conf)); 217 } 218 219 /** 220 * Creates a new {@link Job} with no particular {@link Cluster}. 221 * A Cluster will be created from the conf parameter only when it's needed. 222 * 223 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 224 * that any necessary internal modifications do not reflect on the incoming 225 * parameter. 226 * 227 * @param ignored 228 * @return the {@link Job} , with no connection to a cluster yet. 229 * @throws IOException 230 * @deprecated Use {@link #getInstance()} 231 */ 232 @Deprecated 233 public static Job getInstance(Cluster ignored) throws IOException { 234 return getInstance(); 235 } 236 237 /** 238 * Creates a new {@link Job} with no particular {@link Cluster} and given 239 * {@link Configuration}. 240 * A Cluster will be created from the conf parameter only when it's needed. 241 * 242 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 243 * that any necessary internal modifications do not reflect on the incoming 244 * parameter. 245 * 246 * @param ignored 247 * @param conf job configuration 248 * @return the {@link Job} , with no connection to a cluster yet. 249 * @throws IOException 250 * @deprecated Use {@link #getInstance(Configuration)} 251 */ 252 @Deprecated 253 public static Job getInstance(Cluster ignored, Configuration conf) 254 throws IOException { 255 return getInstance(conf); 256 } 257 258 /** 259 * Creates a new {@link Job} with no particular {@link Cluster} and given 260 * {@link Configuration} and {@link JobStatus}. 261 * A Cluster will be created from the conf parameter only when it's needed. 262 * 263 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 264 * that any necessary internal modifications do not reflect on the incoming 265 * parameter. 266 * 267 * @param cluster cluster 268 * @param status job status 269 * @param conf job configuration 270 * @return the {@link Job} , with no connection to a cluster yet. 271 * @throws IOException 272 */ 273 @Private 274 public static Job getInstance(Cluster cluster, JobStatus status, 275 Configuration conf) throws IOException { 276 Job job = getInstance(status, conf); 277 job.setCluster(cluster); 278 return job; 279 } 280 281 private void ensureState(JobState state) throws IllegalStateException { 282 if (state != this.state) { 283 throw new IllegalStateException("Job in state "+ this.state + 284 " instead of " + state); 285 } 286 287 if (state == JobState.RUNNING && cluster == null) { 288 throw new IllegalStateException 289 ("Job in state " + this.state 290 + ", but it isn't attached to any job tracker!"); 291 } 292 } 293 294 /** 295 * Some methods rely on having a recent job status object. Refresh 296 * it, if necessary 297 */ 298 synchronized void ensureFreshStatus() 299 throws IOException, InterruptedException { 300 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) { 301 updateStatus(); 302 } 303 } 304 305 /** Some methods need to update status immediately. So, refresh 306 * immediately 307 * @throws IOException 308 */ 309 synchronized void updateStatus() throws IOException, InterruptedException { 310 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 311 @Override 312 public JobStatus run() throws IOException, InterruptedException { 313 return cluster.getClient().getJobStatus(status.getJobID()); 314 } 315 }); 316 if (this.status == null) { 317 throw new IOException("Job status not available "); 318 } 319 this.statustime = System.currentTimeMillis(); 320 } 321 322 public JobStatus getStatus() throws IOException, InterruptedException { 323 ensureState(JobState.RUNNING); 324 updateStatus(); 325 return status; 326 } 327 328 private void setStatus(JobStatus status) { 329 this.status = status; 330 } 331 332 /** 333 * Returns the current state of the Job. 334 * 335 * @return JobStatus#State 336 * @throws IOException 337 * @throws InterruptedException 338 */ 339 public JobStatus.State getJobState() 340 throws IOException, InterruptedException { 341 ensureState(JobState.RUNNING); 342 updateStatus(); 343 return status.getState(); 344 } 345 346 /** 347 * Get the URL where some job progress information will be displayed. 348 * 349 * @return the URL where some job progress information will be displayed. 350 */ 351 public String getTrackingURL(){ 352 ensureState(JobState.RUNNING); 353 return status.getTrackingUrl().toString(); 354 } 355 356 /** 357 * Get the path of the submitted job configuration. 358 * 359 * @return the path of the submitted job configuration. 360 */ 361 public String getJobFile() { 362 ensureState(JobState.RUNNING); 363 return status.getJobFile(); 364 } 365 366 /** 367 * Get start time of the job. 368 * 369 * @return the start time of the job 370 */ 371 public long getStartTime() { 372 ensureState(JobState.RUNNING); 373 return status.getStartTime(); 374 } 375 376 /** 377 * Get finish time of the job. 378 * 379 * @return the finish time of the job 380 */ 381 public long getFinishTime() throws IOException, InterruptedException { 382 ensureState(JobState.RUNNING); 383 updateStatus(); 384 return status.getFinishTime(); 385 } 386 387 /** 388 * Get scheduling info of the job. 389 * 390 * @return the scheduling info of the job 391 */ 392 public String getSchedulingInfo() { 393 ensureState(JobState.RUNNING); 394 return status.getSchedulingInfo(); 395 } 396 397 /** 398 * Get scheduling info of the job. 399 * 400 * @return the scheduling info of the job 401 */ 402 public JobPriority getPriority() throws IOException, InterruptedException { 403 ensureState(JobState.RUNNING); 404 updateStatus(); 405 return status.getPriority(); 406 } 407 408 /** 409 * The user-specified job name. 410 */ 411 public String getJobName() { 412 if (state == JobState.DEFINE) { 413 return super.getJobName(); 414 } 415 ensureState(JobState.RUNNING); 416 return status.getJobName(); 417 } 418 419 public String getHistoryUrl() throws IOException, InterruptedException { 420 ensureState(JobState.RUNNING); 421 updateStatus(); 422 return status.getHistoryFile(); 423 } 424 425 public boolean isRetired() throws IOException, InterruptedException { 426 ensureState(JobState.RUNNING); 427 updateStatus(); 428 return status.isRetired(); 429 } 430 431 @Private 432 public Cluster getCluster() { 433 return cluster; 434 } 435 436 /** Only for mocks in unit tests. */ 437 @Private 438 private void setCluster(Cluster cluster) { 439 this.cluster = cluster; 440 } 441 442 /** 443 * Dump stats to screen. 444 */ 445 @Override 446 public String toString() { 447 ensureState(JobState.RUNNING); 448 String reasonforFailure = " "; 449 int numMaps = 0; 450 int numReduces = 0; 451 try { 452 updateStatus(); 453 if (status.getState().equals(JobStatus.State.FAILED)) 454 reasonforFailure = getTaskFailureEventString(); 455 numMaps = getTaskReports(TaskType.MAP).length; 456 numReduces = getTaskReports(TaskType.REDUCE).length; 457 } catch (IOException e) { 458 } catch (InterruptedException ie) { 459 } 460 StringBuffer sb = new StringBuffer(); 461 sb.append("Job: ").append(status.getJobID()).append("\n"); 462 sb.append("Job File: ").append(status.getJobFile()).append("\n"); 463 sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); 464 sb.append("\n"); 465 sb.append("Uber job : ").append(status.isUber()).append("\n"); 466 sb.append("Number of maps: ").append(numMaps).append("\n"); 467 sb.append("Number of reduces: ").append(numReduces).append("\n"); 468 sb.append("map() completion: "); 469 sb.append(status.getMapProgress()).append("\n"); 470 sb.append("reduce() completion: "); 471 sb.append(status.getReduceProgress()).append("\n"); 472 sb.append("Job state: "); 473 sb.append(status.getState()).append("\n"); 474 sb.append("retired: ").append(status.isRetired()).append("\n"); 475 sb.append("reason for failure: ").append(reasonforFailure); 476 return sb.toString(); 477 } 478 479 /** 480 * @return taskid which caused job failure 481 * @throws IOException 482 * @throws InterruptedException 483 */ 484 String getTaskFailureEventString() throws IOException, 485 InterruptedException { 486 int failCount = 1; 487 TaskCompletionEvent lastEvent = null; 488 TaskCompletionEvent[] events = ugi.doAs(new 489 PrivilegedExceptionAction<TaskCompletionEvent[]>() { 490 @Override 491 public TaskCompletionEvent[] run() throws IOException, 492 InterruptedException { 493 return cluster.getClient().getTaskCompletionEvents( 494 status.getJobID(), 0, 10); 495 } 496 }); 497 for (TaskCompletionEvent event : events) { 498 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) { 499 failCount++; 500 lastEvent = event; 501 } 502 } 503 if (lastEvent == null) { 504 return "There are no failed tasks for the job. " 505 + "Job is failed due to some other reason and reason " 506 + "can be found in the logs."; 507 } 508 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2); 509 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2); 510 return (" task " + taskID + " failed " + 511 failCount + " times " + "For details check tasktracker at: " + 512 lastEvent.getTaskTrackerHttp()); 513 } 514 515 /** 516 * Get the information of the current state of the tasks of a job. 517 * 518 * @param type Type of the task 519 * @return the list of all of the map tips. 520 * @throws IOException 521 */ 522 public TaskReport[] getTaskReports(TaskType type) 523 throws IOException, InterruptedException { 524 ensureState(JobState.RUNNING); 525 final TaskType tmpType = type; 526 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() { 527 public TaskReport[] run() throws IOException, InterruptedException { 528 return cluster.getClient().getTaskReports(getJobID(), tmpType); 529 } 530 }); 531 } 532 533 /** 534 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 535 * and 1.0. When all map tasks have completed, the function returns 1.0. 536 * 537 * @return the progress of the job's map-tasks. 538 * @throws IOException 539 */ 540 public float mapProgress() throws IOException, InterruptedException { 541 ensureState(JobState.RUNNING); 542 ensureFreshStatus(); 543 return status.getMapProgress(); 544 } 545 546 /** 547 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 548 * and 1.0. When all reduce tasks have completed, the function returns 1.0. 549 * 550 * @return the progress of the job's reduce-tasks. 551 * @throws IOException 552 */ 553 public float reduceProgress() throws IOException, InterruptedException { 554 ensureState(JobState.RUNNING); 555 ensureFreshStatus(); 556 return status.getReduceProgress(); 557 } 558 559 /** 560 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 561 * and 1.0. When all cleanup tasks have completed, the function returns 1.0. 562 * 563 * @return the progress of the job's cleanup-tasks. 564 * @throws IOException 565 */ 566 public float cleanupProgress() throws IOException, InterruptedException { 567 ensureState(JobState.RUNNING); 568 ensureFreshStatus(); 569 return status.getCleanupProgress(); 570 } 571 572 /** 573 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 574 * and 1.0. When all setup tasks have completed, the function returns 1.0. 575 * 576 * @return the progress of the job's setup-tasks. 577 * @throws IOException 578 */ 579 public float setupProgress() throws IOException, InterruptedException { 580 ensureState(JobState.RUNNING); 581 ensureFreshStatus(); 582 return status.getSetupProgress(); 583 } 584 585 /** 586 * Check if the job is finished or not. 587 * This is a non-blocking call. 588 * 589 * @return <code>true</code> if the job is complete, else <code>false</code>. 590 * @throws IOException 591 */ 592 public boolean isComplete() throws IOException, InterruptedException { 593 ensureState(JobState.RUNNING); 594 updateStatus(); 595 return status.isJobComplete(); 596 } 597 598 /** 599 * Check if the job completed successfully. 600 * 601 * @return <code>true</code> if the job succeeded, else <code>false</code>. 602 * @throws IOException 603 */ 604 public boolean isSuccessful() throws IOException, InterruptedException { 605 ensureState(JobState.RUNNING); 606 updateStatus(); 607 return status.getState() == JobStatus.State.SUCCEEDED; 608 } 609 610 /** 611 * Kill the running job. Blocks until all job tasks have been 612 * killed as well. If the job is no longer running, it simply returns. 613 * 614 * @throws IOException 615 */ 616 public void killJob() throws IOException, InterruptedException { 617 ensureState(JobState.RUNNING); 618 cluster.getClient().killJob(getJobID()); 619 } 620 621 /** 622 * Set the priority of a running job. 623 * @param priority the new priority for the job. 624 * @throws IOException 625 */ 626 public void setPriority(JobPriority priority) 627 throws IOException, InterruptedException { 628 if (state == JobState.DEFINE) { 629 conf.setJobPriority( 630 org.apache.hadoop.mapred.JobPriority.valueOf(priority.name())); 631 } else { 632 ensureState(JobState.RUNNING); 633 final JobPriority tmpPriority = priority; 634 ugi.doAs(new PrivilegedExceptionAction<Object>() { 635 @Override 636 public Object run() throws IOException, InterruptedException { 637 cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString()); 638 return null; 639 } 640 }); 641 } 642 } 643 644 /** 645 * Get events indicating completion (success/failure) of component tasks. 646 * 647 * @param startFrom index to start fetching events from 648 * @param numEvents number of events to fetch 649 * @return an array of {@link TaskCompletionEvent}s 650 * @throws IOException 651 */ 652 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom, 653 final int numEvents) throws IOException, InterruptedException { 654 ensureState(JobState.RUNNING); 655 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() { 656 @Override 657 public TaskCompletionEvent[] run() throws IOException, InterruptedException { 658 return cluster.getClient().getTaskCompletionEvents(getJobID(), 659 startFrom, numEvents); 660 } 661 }); 662 } 663 664 /** 665 * Kill indicated task attempt. 666 * 667 * @param taskId the id of the task to be terminated. 668 * @throws IOException 669 */ 670 public boolean killTask(final TaskAttemptID taskId) 671 throws IOException, InterruptedException { 672 ensureState(JobState.RUNNING); 673 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { 674 public Boolean run() throws IOException, InterruptedException { 675 return cluster.getClient().killTask(taskId, false); 676 } 677 }); 678 } 679 680 /** 681 * Fail indicated task attempt. 682 * 683 * @param taskId the id of the task to be terminated. 684 * @throws IOException 685 */ 686 public boolean failTask(final TaskAttemptID taskId) 687 throws IOException, InterruptedException { 688 ensureState(JobState.RUNNING); 689 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { 690 @Override 691 public Boolean run() throws IOException, InterruptedException { 692 return cluster.getClient().killTask(taskId, true); 693 } 694 }); 695 } 696 697 /** 698 * Gets the counters for this job. May return null if the job has been 699 * retired and the job is no longer in the completed job store. 700 * 701 * @return the counters for this job. 702 * @throws IOException 703 */ 704 public Counters getCounters() 705 throws IOException, InterruptedException { 706 ensureState(JobState.RUNNING); 707 return ugi.doAs(new PrivilegedExceptionAction<Counters>() { 708 @Override 709 public Counters run() throws IOException, InterruptedException { 710 return cluster.getClient().getJobCounters(getJobID()); 711 } 712 }); 713 } 714 715 /** 716 * Gets the diagnostic messages for a given task attempt. 717 * @param taskid 718 * @return the list of diagnostic messages for the task 719 * @throws IOException 720 */ 721 public String[] getTaskDiagnostics(final TaskAttemptID taskid) 722 throws IOException, InterruptedException { 723 ensureState(JobState.RUNNING); 724 return ugi.doAs(new PrivilegedExceptionAction<String[]>() { 725 @Override 726 public String[] run() throws IOException, InterruptedException { 727 return cluster.getClient().getTaskDiagnostics(taskid); 728 } 729 }); 730 } 731 732 /** 733 * Set the number of reduce tasks for the job. 734 * @param tasks the number of reduce tasks 735 * @throws IllegalStateException if the job is submitted 736 */ 737 public void setNumReduceTasks(int tasks) throws IllegalStateException { 738 ensureState(JobState.DEFINE); 739 conf.setNumReduceTasks(tasks); 740 } 741 742 /** 743 * Set the current working directory for the default file system. 744 * 745 * @param dir the new current working directory. 746 * @throws IllegalStateException if the job is submitted 747 */ 748 public void setWorkingDirectory(Path dir) throws IOException { 749 ensureState(JobState.DEFINE); 750 conf.setWorkingDirectory(dir); 751 } 752 753 /** 754 * Set the {@link InputFormat} for the job. 755 * @param cls the <code>InputFormat</code> to use 756 * @throws IllegalStateException if the job is submitted 757 */ 758 public void setInputFormatClass(Class<? extends InputFormat> cls 759 ) throws IllegalStateException { 760 ensureState(JobState.DEFINE); 761 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 762 InputFormat.class); 763 } 764 765 /** 766 * Set the {@link OutputFormat} for the job. 767 * @param cls the <code>OutputFormat</code> to use 768 * @throws IllegalStateException if the job is submitted 769 */ 770 public void setOutputFormatClass(Class<? extends OutputFormat> cls 771 ) throws IllegalStateException { 772 ensureState(JobState.DEFINE); 773 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 774 OutputFormat.class); 775 } 776 777 /** 778 * Set the {@link Mapper} for the job. 779 * @param cls the <code>Mapper</code> to use 780 * @throws IllegalStateException if the job is submitted 781 */ 782 public void setMapperClass(Class<? extends Mapper> cls 783 ) throws IllegalStateException { 784 ensureState(JobState.DEFINE); 785 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class); 786 } 787 788 /** 789 * Set the Jar by finding where a given class came from. 790 * @param cls the example class 791 */ 792 public void setJarByClass(Class<?> cls) { 793 ensureState(JobState.DEFINE); 794 conf.setJarByClass(cls); 795 } 796 797 /** 798 * Set the job jar 799 */ 800 public void setJar(String jar) { 801 ensureState(JobState.DEFINE); 802 conf.setJar(jar); 803 } 804 805 /** 806 * Set the reported username for this job. 807 * 808 * @param user the username for this job. 809 */ 810 public void setUser(String user) { 811 ensureState(JobState.DEFINE); 812 conf.setUser(user); 813 } 814 815 /** 816 * Set the combiner class for the job. 817 * @param cls the combiner to use 818 * @throws IllegalStateException if the job is submitted 819 */ 820 public void setCombinerClass(Class<? extends Reducer> cls 821 ) throws IllegalStateException { 822 ensureState(JobState.DEFINE); 823 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); 824 } 825 826 /** 827 * Set the {@link Reducer} for the job. 828 * @param cls the <code>Reducer</code> to use 829 * @throws IllegalStateException if the job is submitted 830 */ 831 public void setReducerClass(Class<? extends Reducer> cls 832 ) throws IllegalStateException { 833 ensureState(JobState.DEFINE); 834 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class); 835 } 836 837 /** 838 * Set the {@link Partitioner} for the job. 839 * @param cls the <code>Partitioner</code> to use 840 * @throws IllegalStateException if the job is submitted 841 */ 842 public void setPartitionerClass(Class<? extends Partitioner> cls 843 ) throws IllegalStateException { 844 ensureState(JobState.DEFINE); 845 conf.setClass(PARTITIONER_CLASS_ATTR, cls, 846 Partitioner.class); 847 } 848 849 /** 850 * Set the key class for the map output data. This allows the user to 851 * specify the map output key class to be different than the final output 852 * value class. 853 * 854 * @param theClass the map output key class. 855 * @throws IllegalStateException if the job is submitted 856 */ 857 public void setMapOutputKeyClass(Class<?> theClass 858 ) throws IllegalStateException { 859 ensureState(JobState.DEFINE); 860 conf.setMapOutputKeyClass(theClass); 861 } 862 863 /** 864 * Set the value class for the map output data. This allows the user to 865 * specify the map output value class to be different than the final output 866 * value class. 867 * 868 * @param theClass the map output value class. 869 * @throws IllegalStateException if the job is submitted 870 */ 871 public void setMapOutputValueClass(Class<?> theClass 872 ) throws IllegalStateException { 873 ensureState(JobState.DEFINE); 874 conf.setMapOutputValueClass(theClass); 875 } 876 877 /** 878 * Set the key class for the job output data. 879 * 880 * @param theClass the key class for the job output data. 881 * @throws IllegalStateException if the job is submitted 882 */ 883 public void setOutputKeyClass(Class<?> theClass 884 ) throws IllegalStateException { 885 ensureState(JobState.DEFINE); 886 conf.setOutputKeyClass(theClass); 887 } 888 889 /** 890 * Set the value class for job outputs. 891 * 892 * @param theClass the value class for job outputs. 893 * @throws IllegalStateException if the job is submitted 894 */ 895 public void setOutputValueClass(Class<?> theClass 896 ) throws IllegalStateException { 897 ensureState(JobState.DEFINE); 898 conf.setOutputValueClass(theClass); 899 } 900 901 /** 902 * Define the comparator that controls how the keys are sorted before they 903 * are passed to the {@link Reducer}. 904 * @param cls the raw comparator 905 * @throws IllegalStateException if the job is submitted 906 */ 907 public void setSortComparatorClass(Class<? extends RawComparator> cls 908 ) throws IllegalStateException { 909 ensureState(JobState.DEFINE); 910 conf.setOutputKeyComparatorClass(cls); 911 } 912 913 /** 914 * Define the comparator that controls which keys are grouped together 915 * for a single call to 916 * {@link Reducer#reduce(Object, Iterable, 917 * org.apache.hadoop.mapreduce.Reducer.Context)} 918 * @param cls the raw comparator to use 919 * @throws IllegalStateException if the job is submitted 920 */ 921 public void setGroupingComparatorClass(Class<? extends RawComparator> cls 922 ) throws IllegalStateException { 923 ensureState(JobState.DEFINE); 924 conf.setOutputValueGroupingComparator(cls); 925 } 926 927 /** 928 * Set the user-specified job name. 929 * 930 * @param name the job's new name. 931 * @throws IllegalStateException if the job is submitted 932 */ 933 public void setJobName(String name) throws IllegalStateException { 934 ensureState(JobState.DEFINE); 935 conf.setJobName(name); 936 } 937 938 /** 939 * Turn speculative execution on or off for this job. 940 * 941 * @param speculativeExecution <code>true</code> if speculative execution 942 * should be turned on, else <code>false</code>. 943 */ 944 public void setSpeculativeExecution(boolean speculativeExecution) { 945 ensureState(JobState.DEFINE); 946 conf.setSpeculativeExecution(speculativeExecution); 947 } 948 949 /** 950 * Turn speculative execution on or off for this job for map tasks. 951 * 952 * @param speculativeExecution <code>true</code> if speculative execution 953 * should be turned on for map tasks, 954 * else <code>false</code>. 955 */ 956 public void setMapSpeculativeExecution(boolean speculativeExecution) { 957 ensureState(JobState.DEFINE); 958 conf.setMapSpeculativeExecution(speculativeExecution); 959 } 960 961 /** 962 * Turn speculative execution on or off for this job for reduce tasks. 963 * 964 * @param speculativeExecution <code>true</code> if speculative execution 965 * should be turned on for reduce tasks, 966 * else <code>false</code>. 967 */ 968 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 969 ensureState(JobState.DEFINE); 970 conf.setReduceSpeculativeExecution(speculativeExecution); 971 } 972 973 /** 974 * Specify whether job-setup and job-cleanup is needed for the job 975 * 976 * @param needed If <code>true</code>, job-setup and job-cleanup will be 977 * considered from {@link OutputCommitter} 978 * else ignored. 979 */ 980 public void setJobSetupCleanupNeeded(boolean needed) { 981 ensureState(JobState.DEFINE); 982 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed); 983 } 984 985 /** 986 * Set the given set of archives 987 * @param archives The list of archives that need to be localized 988 */ 989 public void setCacheArchives(URI[] archives) { 990 ensureState(JobState.DEFINE); 991 DistributedCache.setCacheArchives(archives, conf); 992 } 993 994 /** 995 * Set the given set of files 996 * @param files The list of files that need to be localized 997 */ 998 public void setCacheFiles(URI[] files) { 999 ensureState(JobState.DEFINE); 1000 DistributedCache.setCacheFiles(files, conf); 1001 } 1002 1003 /** 1004 * Add a archives to be localized 1005 * @param uri The uri of the cache to be localized 1006 */ 1007 public void addCacheArchive(URI uri) { 1008 ensureState(JobState.DEFINE); 1009 DistributedCache.addCacheArchive(uri, conf); 1010 } 1011 1012 /** 1013 * Add a file to be localized 1014 * @param uri The uri of the cache to be localized 1015 */ 1016 public void addCacheFile(URI uri) { 1017 ensureState(JobState.DEFINE); 1018 DistributedCache.addCacheFile(uri, conf); 1019 } 1020 1021 /** 1022 * Add an file path to the current set of classpath entries It adds the file 1023 * to cache as well. 1024 * 1025 * Files added with this method will not be unpacked while being added to the 1026 * classpath. 1027 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)} 1028 * method instead. 1029 * 1030 * @param file Path of the file to be added 1031 */ 1032 public void addFileToClassPath(Path file) 1033 throws IOException { 1034 ensureState(JobState.DEFINE); 1035 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf)); 1036 } 1037 1038 /** 1039 * Add an archive path to the current set of classpath entries. It adds the 1040 * archive to cache as well. 1041 * 1042 * Archive files will be unpacked and added to the classpath 1043 * when being distributed. 1044 * 1045 * @param archive Path of the archive to be added 1046 */ 1047 public void addArchiveToClassPath(Path archive) 1048 throws IOException { 1049 ensureState(JobState.DEFINE); 1050 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); 1051 } 1052 1053 /** 1054 * Originally intended to enable symlinks, but currently symlinks cannot be 1055 * disabled. 1056 */ 1057 @Deprecated 1058 public void createSymlink() { 1059 ensureState(JobState.DEFINE); 1060 DistributedCache.createSymlink(conf); 1061 } 1062 1063 /** 1064 * Expert: Set the number of maximum attempts that will be made to run a 1065 * map task. 1066 * 1067 * @param n the number of attempts per map task. 1068 */ 1069 public void setMaxMapAttempts(int n) { 1070 ensureState(JobState.DEFINE); 1071 conf.setMaxMapAttempts(n); 1072 } 1073 1074 /** 1075 * Expert: Set the number of maximum attempts that will be made to run a 1076 * reduce task. 1077 * 1078 * @param n the number of attempts per reduce task. 1079 */ 1080 public void setMaxReduceAttempts(int n) { 1081 ensureState(JobState.DEFINE); 1082 conf.setMaxReduceAttempts(n); 1083 } 1084 1085 /** 1086 * Set whether the system should collect profiler information for some of 1087 * the tasks in this job? The information is stored in the user log 1088 * directory. 1089 * @param newValue true means it should be gathered 1090 */ 1091 public void setProfileEnabled(boolean newValue) { 1092 ensureState(JobState.DEFINE); 1093 conf.setProfileEnabled(newValue); 1094 } 1095 1096 /** 1097 * Set the profiler configuration arguments. If the string contains a '%s' it 1098 * will be replaced with the name of the profiling output file when the task 1099 * runs. 1100 * 1101 * This value is passed to the task child JVM on the command line. 1102 * 1103 * @param value the configuration string 1104 */ 1105 public void setProfileParams(String value) { 1106 ensureState(JobState.DEFINE); 1107 conf.setProfileParams(value); 1108 } 1109 1110 /** 1111 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1112 * must also be called. 1113 * @param newValue a set of integer ranges of the map ids 1114 */ 1115 public void setProfileTaskRange(boolean isMap, String newValue) { 1116 ensureState(JobState.DEFINE); 1117 conf.setProfileTaskRange(isMap, newValue); 1118 } 1119 1120 private void ensureNotSet(String attr, String msg) throws IOException { 1121 if (conf.get(attr) != null) { 1122 throw new IOException(attr + " is incompatible with " + msg + " mode."); 1123 } 1124 } 1125 1126 /** 1127 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation 1128 * tokens upon job completion. Defaults to true. 1129 */ 1130 public void setCancelDelegationTokenUponJobCompletion(boolean value) { 1131 ensureState(JobState.DEFINE); 1132 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value); 1133 } 1134 1135 /** 1136 * Default to the new APIs unless they are explicitly set or the old mapper or 1137 * reduce attributes are used. 1138 * @throws IOException if the configuration is inconsistant 1139 */ 1140 private void setUseNewAPI() throws IOException { 1141 int numReduces = conf.getNumReduceTasks(); 1142 String oldMapperClass = "mapred.mapper.class"; 1143 String oldReduceClass = "mapred.reducer.class"; 1144 conf.setBooleanIfUnset("mapred.mapper.new-api", 1145 conf.get(oldMapperClass) == null); 1146 if (conf.getUseNewMapper()) { 1147 String mode = "new map API"; 1148 ensureNotSet("mapred.input.format.class", mode); 1149 ensureNotSet(oldMapperClass, mode); 1150 if (numReduces != 0) { 1151 ensureNotSet("mapred.partitioner.class", mode); 1152 } else { 1153 ensureNotSet("mapred.output.format.class", mode); 1154 } 1155 } else { 1156 String mode = "map compatability"; 1157 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode); 1158 ensureNotSet(MAP_CLASS_ATTR, mode); 1159 if (numReduces != 0) { 1160 ensureNotSet(PARTITIONER_CLASS_ATTR, mode); 1161 } else { 1162 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1163 } 1164 } 1165 if (numReduces != 0) { 1166 conf.setBooleanIfUnset("mapred.reducer.new-api", 1167 conf.get(oldReduceClass) == null); 1168 if (conf.getUseNewReducer()) { 1169 String mode = "new reduce API"; 1170 ensureNotSet("mapred.output.format.class", mode); 1171 ensureNotSet(oldReduceClass, mode); 1172 } else { 1173 String mode = "reduce compatability"; 1174 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1175 ensureNotSet(REDUCE_CLASS_ATTR, mode); 1176 } 1177 } 1178 } 1179 1180 private synchronized void connect() 1181 throws IOException, InterruptedException, ClassNotFoundException { 1182 if (cluster == null) { 1183 cluster = 1184 ugi.doAs(new PrivilegedExceptionAction<Cluster>() { 1185 public Cluster run() 1186 throws IOException, InterruptedException, 1187 ClassNotFoundException { 1188 return new Cluster(getConfiguration()); 1189 } 1190 }); 1191 } 1192 } 1193 1194 boolean isConnected() { 1195 return cluster != null; 1196 } 1197 1198 /** Only for mocking via unit tests. */ 1199 @Private 1200 public JobSubmitter getJobSubmitter(FileSystem fs, 1201 ClientProtocol submitClient) throws IOException { 1202 return new JobSubmitter(fs, submitClient); 1203 } 1204 /** 1205 * Submit the job to the cluster and return immediately. 1206 * @throws IOException 1207 */ 1208 public void submit() 1209 throws IOException, InterruptedException, ClassNotFoundException { 1210 ensureState(JobState.DEFINE); 1211 setUseNewAPI(); 1212 connect(); 1213 final JobSubmitter submitter = 1214 getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); 1215 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 1216 public JobStatus run() throws IOException, InterruptedException, 1217 ClassNotFoundException { 1218 return submitter.submitJobInternal(Job.this, cluster); 1219 } 1220 }); 1221 state = JobState.RUNNING; 1222 LOG.info("The url to track the job: " + getTrackingURL()); 1223 } 1224 1225 /** 1226 * Submit the job to the cluster and wait for it to finish. 1227 * @param verbose print the progress to the user 1228 * @return true if the job succeeded 1229 * @throws IOException thrown if the communication with the 1230 * <code>JobTracker</code> is lost 1231 */ 1232 public boolean waitForCompletion(boolean verbose 1233 ) throws IOException, InterruptedException, 1234 ClassNotFoundException { 1235 if (state == JobState.DEFINE) { 1236 submit(); 1237 } 1238 if (verbose) { 1239 monitorAndPrintJob(); 1240 } else { 1241 // get the completion poll interval from the client. 1242 int completionPollIntervalMillis = 1243 Job.getCompletionPollInterval(cluster.getConf()); 1244 while (!isComplete()) { 1245 try { 1246 Thread.sleep(completionPollIntervalMillis); 1247 } catch (InterruptedException ie) { 1248 } 1249 } 1250 } 1251 return isSuccessful(); 1252 } 1253 1254 /** 1255 * Monitor a job and print status in real-time as progress is made and tasks 1256 * fail. 1257 * @return true if the job succeeded 1258 * @throws IOException if communication to the JobTracker fails 1259 */ 1260 public boolean monitorAndPrintJob() 1261 throws IOException, InterruptedException { 1262 String lastReport = null; 1263 Job.TaskStatusFilter filter; 1264 Configuration clientConf = getConfiguration(); 1265 filter = Job.getTaskOutputFilter(clientConf); 1266 JobID jobId = getJobID(); 1267 LOG.info("Running job: " + jobId); 1268 int eventCounter = 0; 1269 boolean profiling = getProfileEnabled(); 1270 IntegerRanges mapRanges = getProfileTaskRange(true); 1271 IntegerRanges reduceRanges = getProfileTaskRange(false); 1272 int progMonitorPollIntervalMillis = 1273 Job.getProgressPollInterval(clientConf); 1274 /* make sure to report full progress after the job is done */ 1275 boolean reportedAfterCompletion = false; 1276 boolean reportedUberMode = false; 1277 while (!isComplete() || !reportedAfterCompletion) { 1278 if (isComplete()) { 1279 reportedAfterCompletion = true; 1280 } else { 1281 Thread.sleep(progMonitorPollIntervalMillis); 1282 } 1283 if (status.getState() == JobStatus.State.PREP) { 1284 continue; 1285 } 1286 if (!reportedUberMode) { 1287 reportedUberMode = true; 1288 LOG.info("Job " + jobId + " running in uber mode : " + isUber()); 1289 } 1290 String report = 1291 (" map " + StringUtils.formatPercent(mapProgress(), 0)+ 1292 " reduce " + 1293 StringUtils.formatPercent(reduceProgress(), 0)); 1294 if (!report.equals(lastReport)) { 1295 LOG.info(report); 1296 lastReport = report; 1297 } 1298 1299 TaskCompletionEvent[] events = 1300 getTaskCompletionEvents(eventCounter, 10); 1301 eventCounter += events.length; 1302 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges); 1303 } 1304 boolean success = isSuccessful(); 1305 if (success) { 1306 LOG.info("Job " + jobId + " completed successfully"); 1307 } else { 1308 LOG.info("Job " + jobId + " failed with state " + status.getState() + 1309 " due to: " + status.getFailureInfo()); 1310 } 1311 Counters counters = getCounters(); 1312 if (counters != null) { 1313 LOG.info(counters.toString()); 1314 } 1315 return success; 1316 } 1317 1318 /** 1319 * @return true if the profile parameters indicate that this is using 1320 * hprof, which generates profile files in a particular location 1321 * that we can retrieve to the client. 1322 */ 1323 private boolean shouldDownloadProfile() { 1324 // Check the argument string that was used to initialize profiling. 1325 // If this indicates hprof and file-based output, then we're ok to 1326 // download. 1327 String profileParams = getProfileParams(); 1328 1329 if (null == profileParams) { 1330 return false; 1331 } 1332 1333 // Split this on whitespace. 1334 String [] parts = profileParams.split("[ \\t]+"); 1335 1336 // If any of these indicate hprof, and the use of output files, return true. 1337 boolean hprofFound = false; 1338 boolean fileFound = false; 1339 for (String p : parts) { 1340 if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) { 1341 hprofFound = true; 1342 1343 // This contains a number of comma-delimited components, one of which 1344 // may specify the file to write to. Make sure this is present and 1345 // not empty. 1346 String [] subparts = p.split(","); 1347 for (String sub : subparts) { 1348 if (sub.startsWith("file=") && sub.length() != "file=".length()) { 1349 fileFound = true; 1350 } 1351 } 1352 } 1353 } 1354 1355 return hprofFound && fileFound; 1356 } 1357 1358 private void printTaskEvents(TaskCompletionEvent[] events, 1359 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges, 1360 IntegerRanges reduceRanges) throws IOException, InterruptedException { 1361 for (TaskCompletionEvent event : events) { 1362 switch (filter) { 1363 case NONE: 1364 break; 1365 case SUCCEEDED: 1366 if (event.getStatus() == 1367 TaskCompletionEvent.Status.SUCCEEDED) { 1368 LOG.info(event.toString()); 1369 } 1370 break; 1371 case FAILED: 1372 if (event.getStatus() == 1373 TaskCompletionEvent.Status.FAILED) { 1374 LOG.info(event.toString()); 1375 // Displaying the task diagnostic information 1376 TaskAttemptID taskId = event.getTaskAttemptId(); 1377 String[] taskDiagnostics = getTaskDiagnostics(taskId); 1378 if (taskDiagnostics != null) { 1379 for (String diagnostics : taskDiagnostics) { 1380 System.err.println(diagnostics); 1381 } 1382 } 1383 } 1384 break; 1385 case KILLED: 1386 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){ 1387 LOG.info(event.toString()); 1388 } 1389 break; 1390 case ALL: 1391 LOG.info(event.toString()); 1392 break; 1393 } 1394 } 1395 } 1396 1397 /** The interval at which monitorAndPrintJob() prints status */ 1398 public static int getProgressPollInterval(Configuration conf) { 1399 // Read progress monitor poll interval from config. Default is 1 second. 1400 int progMonitorPollIntervalMillis = conf.getInt( 1401 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL); 1402 if (progMonitorPollIntervalMillis < 1) { 1403 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 1404 " has been set to an invalid value; " 1405 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL); 1406 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL; 1407 } 1408 return progMonitorPollIntervalMillis; 1409 } 1410 1411 /** The interval at which waitForCompletion() should check. */ 1412 public static int getCompletionPollInterval(Configuration conf) { 1413 int completionPollIntervalMillis = conf.getInt( 1414 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL); 1415 if (completionPollIntervalMillis < 1) { 1416 LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 1417 " has been set to an invalid value; " 1418 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL); 1419 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL; 1420 } 1421 return completionPollIntervalMillis; 1422 } 1423 1424 /** 1425 * Get the task output filter. 1426 * 1427 * @param conf the configuration. 1428 * @return the filter level. 1429 */ 1430 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) { 1431 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED")); 1432 } 1433 1434 /** 1435 * Modify the Configuration to set the task output filter. 1436 * 1437 * @param conf the Configuration to modify. 1438 * @param newValue the value to set. 1439 */ 1440 public static void setTaskOutputFilter(Configuration conf, 1441 TaskStatusFilter newValue) { 1442 conf.set(Job.OUTPUT_FILTER, newValue.toString()); 1443 } 1444 1445 public boolean isUber() throws IOException, InterruptedException { 1446 ensureState(JobState.RUNNING); 1447 updateStatus(); 1448 return status.isUber(); 1449 } 1450 1451}