001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapred; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.net.InetSocketAddress; 023import java.net.URL; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.List; 028 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.io.Text; 036import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo; 037import org.apache.hadoop.mapreduce.Cluster; 038import org.apache.hadoop.mapreduce.ClusterMetrics; 039import org.apache.hadoop.mapreduce.Job; 040import org.apache.hadoop.mapreduce.QueueInfo; 041import org.apache.hadoop.mapreduce.TaskTrackerInfo; 042import org.apache.hadoop.mapreduce.TaskType; 043import org.apache.hadoop.mapreduce.filecache.DistributedCache; 044import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 045import org.apache.hadoop.mapreduce.tools.CLI; 046import org.apache.hadoop.mapreduce.util.ConfigUtil; 047import org.apache.hadoop.security.UserGroupInformation; 048import org.apache.hadoop.security.token.SecretManager.InvalidToken; 049import org.apache.hadoop.security.token.Token; 050import org.apache.hadoop.security.token.TokenRenewer; 051import org.apache.hadoop.util.Tool; 052import org.apache.hadoop.util.ToolRunner; 053 054/** 055 * <code>JobClient</code> is the primary interface for the user-job to interact 056 * with the cluster. 057 * 058 * <code>JobClient</code> provides facilities to submit jobs, track their 059 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster 060 * status information etc. 061 * 062 * <p>The job submission process involves: 063 * <ol> 064 * <li> 065 * Checking the input and output specifications of the job. 066 * </li> 067 * <li> 068 * Computing the {@link InputSplit}s for the job. 069 * </li> 070 * <li> 071 * Setup the requisite accounting information for the {@link DistributedCache} 072 * of the job, if necessary. 073 * </li> 074 * <li> 075 * Copying the job's jar and configuration to the map-reduce system directory 076 * on the distributed file-system. 077 * </li> 078 * <li> 079 * Submitting the job to the cluster and optionally monitoring 080 * it's status. 081 * </li> 082 * </ol></p> 083 * 084 * Normally the user creates the application, describes various facets of the 085 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 086 * the job and monitor its progress. 087 * 088 * <p>Here is an example on how to use <code>JobClient</code>:</p> 089 * <p><blockquote><pre> 090 * // Create a new JobConf 091 * JobConf job = new JobConf(new Configuration(), MyJob.class); 092 * 093 * // Specify various job-specific parameters 094 * job.setJobName("myjob"); 095 * 096 * job.setInputPath(new Path("in")); 097 * job.setOutputPath(new Path("out")); 098 * 099 * job.setMapperClass(MyJob.MyMapper.class); 100 * job.setReducerClass(MyJob.MyReducer.class); 101 * 102 * // Submit the job, then poll for progress until the job is complete 103 * JobClient.runJob(job); 104 * </pre></blockquote></p> 105 * 106 * <h4 id="JobControl">Job Control</h4> 107 * 108 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 109 * which cannot be done via a single map-reduce job. This is fairly easy since 110 * the output of the job, typically, goes to distributed file-system and that 111 * can be used as the input for the next job.</p> 112 * 113 * <p>However, this also means that the onus on ensuring jobs are complete 114 * (success/failure) lies squarely on the clients. In such situations the 115 * various job-control options are: 116 * <ol> 117 * <li> 118 * {@link #runJob(JobConf)} : submits the job and returns only after 119 * the job has completed. 120 * </li> 121 * <li> 122 * {@link #submitJob(JobConf)} : only submits the job, then poll the 123 * returned handle to the {@link RunningJob} to query status and make 124 * scheduling decisions. 125 * </li> 126 * <li> 127 * {@link JobConf#setJobEndNotificationURI(String)} : setup a notification 128 * on job-completion, thus avoiding polling. 129 * </li> 130 * </ol></p> 131 * 132 * @see JobConf 133 * @see ClusterStatus 134 * @see Tool 135 * @see DistributedCache 136 */ 137@InterfaceAudience.Public 138@InterfaceStability.Stable 139public class JobClient extends CLI { 140 141 @InterfaceAudience.Private 142 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY = 143 "mapreduce.jobclient.retry.policy.enabled"; 144 @InterfaceAudience.Private 145 public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = 146 false; 147 @InterfaceAudience.Private 148 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY = 149 "mapreduce.jobclient.retry.policy.spec"; 150 @InterfaceAudience.Private 151 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT = 152 "10000,6,60000,10"; // t1,n1,t2,n2,... 153 154 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 155 private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 156 157 static{ 158 ConfigUtil.loadResources(); 159 } 160 161 /** 162 * A NetworkedJob is an implementation of RunningJob. It holds 163 * a JobProfile object to provide some info, and interacts with the 164 * remote service to provide certain functionality. 165 */ 166 static class NetworkedJob implements RunningJob { 167 Job job; 168 /** 169 * We store a JobProfile and a timestamp for when we last 170 * acquired the job profile. If the job is null, then we cannot 171 * perform any of the tasks. The job might be null if the cluster 172 * has completely forgotten about the job. (eg, 24 hours after the 173 * job completes.) 174 */ 175 public NetworkedJob(JobStatus status, Cluster cluster) throws IOException { 176 this(status, cluster, new JobConf(status.getJobFile())); 177 } 178 179 private NetworkedJob(JobStatus status, Cluster cluster, JobConf conf) 180 throws IOException { 181 this(Job.getInstance(cluster, status, conf)); 182 } 183 184 public NetworkedJob(Job job) throws IOException { 185 this.job = job; 186 } 187 188 public Configuration getConfiguration() { 189 return job.getConfiguration(); 190 } 191 192 /** 193 * An identifier for the job 194 */ 195 public JobID getID() { 196 return JobID.downgrade(job.getJobID()); 197 } 198 199 /** @deprecated This method is deprecated and will be removed. Applications should 200 * rather use {@link #getID()}.*/ 201 @Deprecated 202 public String getJobID() { 203 return getID().toString(); 204 } 205 206 /** 207 * The user-specified job name 208 */ 209 public String getJobName() { 210 return job.getJobName(); 211 } 212 213 /** 214 * The name of the job file 215 */ 216 public String getJobFile() { 217 return job.getJobFile(); 218 } 219 220 /** 221 * A URL where the job's status can be seen 222 */ 223 public String getTrackingURL() { 224 return job.getTrackingURL(); 225 } 226 227 /** 228 * A float between 0.0 and 1.0, indicating the % of map work 229 * completed. 230 */ 231 public float mapProgress() throws IOException { 232 return job.mapProgress(); 233 } 234 235 /** 236 * A float between 0.0 and 1.0, indicating the % of reduce work 237 * completed. 238 */ 239 public float reduceProgress() throws IOException { 240 return job.reduceProgress(); 241 } 242 243 /** 244 * A float between 0.0 and 1.0, indicating the % of cleanup work 245 * completed. 246 */ 247 public float cleanupProgress() throws IOException { 248 try { 249 return job.cleanupProgress(); 250 } catch (InterruptedException ie) { 251 throw new IOException(ie); 252 } 253 } 254 255 /** 256 * A float between 0.0 and 1.0, indicating the % of setup work 257 * completed. 258 */ 259 public float setupProgress() throws IOException { 260 return job.setupProgress(); 261 } 262 263 /** 264 * Returns immediately whether the whole job is done yet or not. 265 */ 266 public synchronized boolean isComplete() throws IOException { 267 return job.isComplete(); 268 } 269 270 /** 271 * True iff job completed successfully. 272 */ 273 public synchronized boolean isSuccessful() throws IOException { 274 return job.isSuccessful(); 275 } 276 277 /** 278 * Blocks until the job is finished 279 */ 280 public void waitForCompletion() throws IOException { 281 try { 282 job.waitForCompletion(false); 283 } catch (InterruptedException ie) { 284 throw new IOException(ie); 285 } catch (ClassNotFoundException ce) { 286 throw new IOException(ce); 287 } 288 } 289 290 /** 291 * Tells the service to get the state of the current job. 292 */ 293 public synchronized int getJobState() throws IOException { 294 try { 295 return job.getJobState().getValue(); 296 } catch (InterruptedException ie) { 297 throw new IOException(ie); 298 } 299 } 300 301 /** 302 * Tells the service to terminate the current job. 303 */ 304 public synchronized void killJob() throws IOException { 305 job.killJob(); 306 } 307 308 309 /** Set the priority of the job. 310 * @param priority new priority of the job. 311 */ 312 public synchronized void setJobPriority(String priority) 313 throws IOException { 314 try { 315 job.setPriority( 316 org.apache.hadoop.mapreduce.JobPriority.valueOf(priority)); 317 } catch (InterruptedException ie) { 318 throw new IOException(ie); 319 } 320 } 321 322 /** 323 * Kill indicated task attempt. 324 * @param taskId the id of the task to kill. 325 * @param shouldFail if true the task is failed and added to failed tasks list, otherwise 326 * it is just killed, w/o affecting job failure status. 327 */ 328 public synchronized void killTask(TaskAttemptID taskId, 329 boolean shouldFail) throws IOException { 330 if (shouldFail) { 331 job.failTask(taskId); 332 } else { 333 job.killTask(taskId); 334 } 335 } 336 337 /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/ 338 @Deprecated 339 public synchronized void killTask(String taskId, boolean shouldFail) throws IOException { 340 killTask(TaskAttemptID.forName(taskId), shouldFail); 341 } 342 343 /** 344 * Fetch task completion events from cluster for this job. 345 */ 346 public synchronized TaskCompletionEvent[] getTaskCompletionEvents( 347 int startFrom) throws IOException { 348 try { 349 org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 350 job.getTaskCompletionEvents(startFrom, 10); 351 TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length]; 352 for (int i = 0 ; i < acls.length; i++ ) { 353 ret[i] = TaskCompletionEvent.downgrade(acls[i]); 354 } 355 return ret; 356 } catch (InterruptedException ie) { 357 throw new IOException(ie); 358 } 359 } 360 361 /** 362 * Dump stats to screen 363 */ 364 @Override 365 public String toString() { 366 return job.toString(); 367 } 368 369 /** 370 * Returns the counters for this job 371 */ 372 public Counters getCounters() throws IOException { 373 Counters result = null; 374 org.apache.hadoop.mapreduce.Counters temp = job.getCounters(); 375 if(temp != null) { 376 result = Counters.downgrade(temp); 377 } 378 return result; 379 } 380 381 @Override 382 public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException { 383 try { 384 return job.getTaskDiagnostics(id); 385 } catch (InterruptedException ie) { 386 throw new IOException(ie); 387 } 388 } 389 390 public String getHistoryUrl() throws IOException { 391 try { 392 return job.getHistoryUrl(); 393 } catch (InterruptedException ie) { 394 throw new IOException(ie); 395 } 396 } 397 398 public boolean isRetired() throws IOException { 399 try { 400 return job.isRetired(); 401 } catch (InterruptedException ie) { 402 throw new IOException(ie); 403 } 404 } 405 406 boolean monitorAndPrintJob() throws IOException, InterruptedException { 407 return job.monitorAndPrintJob(); 408 } 409 410 @Override 411 public String getFailureInfo() throws IOException { 412 try { 413 return job.getStatus().getFailureInfo(); 414 } catch (InterruptedException ie) { 415 throw new IOException(ie); 416 } 417 } 418 419 @Override 420 public JobStatus getJobStatus() throws IOException { 421 try { 422 return JobStatus.downgrade(job.getStatus()); 423 } catch (InterruptedException ie) { 424 throw new IOException(ie); 425 } 426 } 427 } 428 429 /** 430 * Ugi of the client. We store this ugi when the client is created and 431 * then make sure that the same ugi is used to run the various protocols. 432 */ 433 UserGroupInformation clientUgi; 434 435 /** 436 * Create a job client. 437 */ 438 public JobClient() { 439 } 440 441 /** 442 * Build a job client with the given {@link JobConf}, and connect to the 443 * default cluster 444 * 445 * @param conf the job configuration. 446 * @throws IOException 447 */ 448 public JobClient(JobConf conf) throws IOException { 449 init(conf); 450 } 451 452 /** 453 * Build a job client with the given {@link Configuration}, 454 * and connect to the default cluster 455 * 456 * @param conf the configuration. 457 * @throws IOException 458 */ 459 public JobClient(Configuration conf) throws IOException { 460 init(new JobConf(conf)); 461 } 462 463 /** 464 * Connect to the default cluster 465 * @param conf the job configuration. 466 * @throws IOException 467 */ 468 public void init(JobConf conf) throws IOException { 469 setConf(conf); 470 cluster = new Cluster(conf); 471 clientUgi = UserGroupInformation.getCurrentUser(); 472 } 473 474 /** 475 * Build a job client, connect to the indicated job tracker. 476 * 477 * @param jobTrackAddr the job tracker to connect to. 478 * @param conf configuration. 479 */ 480 public JobClient(InetSocketAddress jobTrackAddr, 481 Configuration conf) throws IOException { 482 cluster = new Cluster(jobTrackAddr, conf); 483 clientUgi = UserGroupInformation.getCurrentUser(); 484 } 485 486 /** 487 * Close the <code>JobClient</code>. 488 */ 489 public synchronized void close() throws IOException { 490 cluster.close(); 491 } 492 493 /** 494 * Get a filesystem handle. We need this to prepare jobs 495 * for submission to the MapReduce system. 496 * 497 * @return the filesystem handle. 498 */ 499 public synchronized FileSystem getFs() throws IOException { 500 try { 501 return cluster.getFileSystem(); 502 } catch (InterruptedException ie) { 503 throw new IOException(ie); 504 } 505 } 506 507 /** 508 * Get a handle to the Cluster 509 */ 510 public Cluster getClusterHandle() { 511 return cluster; 512 } 513 514 /** 515 * Submit a job to the MR system. 516 * 517 * This returns a handle to the {@link RunningJob} which can be used to track 518 * the running-job. 519 * 520 * @param jobFile the job configuration. 521 * @return a handle to the {@link RunningJob} which can be used to track the 522 * running-job. 523 * @throws FileNotFoundException 524 * @throws InvalidJobConfException 525 * @throws IOException 526 */ 527 public RunningJob submitJob(String jobFile) throws FileNotFoundException, 528 InvalidJobConfException, 529 IOException { 530 // Load in the submitted job details 531 JobConf job = new JobConf(jobFile); 532 return submitJob(job); 533 } 534 535 /** 536 * Submit a job to the MR system. 537 * This returns a handle to the {@link RunningJob} which can be used to track 538 * the running-job. 539 * 540 * @param conf the job configuration. 541 * @return a handle to the {@link RunningJob} which can be used to track the 542 * running-job. 543 * @throws FileNotFoundException 544 * @throws IOException 545 */ 546 public RunningJob submitJob(final JobConf conf) throws FileNotFoundException, 547 IOException { 548 return submitJobInternal(conf); 549 } 550 551 @InterfaceAudience.Private 552 public RunningJob submitJobInternal(final JobConf conf) 553 throws FileNotFoundException, IOException { 554 try { 555 conf.setBooleanIfUnset("mapred.mapper.new-api", false); 556 conf.setBooleanIfUnset("mapred.reducer.new-api", false); 557 Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () { 558 @Override 559 public Job run() throws IOException, ClassNotFoundException, 560 InterruptedException { 561 Job job = Job.getInstance(conf); 562 job.submit(); 563 return job; 564 } 565 }); 566 567 Cluster prev = cluster; 568 // update our Cluster instance with the one created by Job for submission 569 // (we can't pass our Cluster instance to Job, since Job wraps the config 570 // instance, and the two configs would then diverge) 571 cluster = job.getCluster(); 572 573 // It is important to close the previous cluster instance 574 // to cleanup resources. 575 if (prev != null) { 576 prev.close(); 577 } 578 return new NetworkedJob(job); 579 } catch (InterruptedException ie) { 580 throw new IOException("interrupted", ie); 581 } 582 } 583 584 private Job getJobUsingCluster(final JobID jobid) throws IOException, 585 InterruptedException { 586 return clientUgi.doAs(new PrivilegedExceptionAction<Job>() { 587 public Job run() throws IOException, InterruptedException { 588 return cluster.getJob(jobid); 589 } 590 }); 591 } 592 /** 593 * Get an {@link RunningJob} object to track an ongoing job. Returns 594 * null if the id does not correspond to any known job. 595 * 596 * @param jobid the jobid of the job. 597 * @return the {@link RunningJob} handle to track the job, null if the 598 * <code>jobid</code> doesn't correspond to any known job. 599 * @throws IOException 600 */ 601 public RunningJob getJob(final JobID jobid) throws IOException { 602 try { 603 604 Job job = getJobUsingCluster(jobid); 605 if (job != null) { 606 JobStatus status = JobStatus.downgrade(job.getStatus()); 607 if (status != null) { 608 return new NetworkedJob(status, cluster, 609 new JobConf(job.getConfiguration())); 610 } 611 } 612 } catch (InterruptedException ie) { 613 throw new IOException(ie); 614 } 615 return null; 616 } 617 618 /**@deprecated Applications should rather use {@link #getJob(JobID)}. 619 */ 620 @Deprecated 621 public RunningJob getJob(String jobid) throws IOException { 622 return getJob(JobID.forName(jobid)); 623 } 624 625 private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0]; 626 627 /** 628 * Get the information of the current state of the map tasks of a job. 629 * 630 * @param jobId the job to query. 631 * @return the list of all of the map tips. 632 * @throws IOException 633 */ 634 public TaskReport[] getMapTaskReports(JobID jobId) throws IOException { 635 return getTaskReports(jobId, TaskType.MAP); 636 } 637 638 private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 639 IOException { 640 try { 641 Job j = getJobUsingCluster(jobId); 642 if(j == null) { 643 return EMPTY_TASK_REPORTS; 644 } 645 return TaskReport.downgradeArray(j.getTaskReports(type)); 646 } catch (InterruptedException ie) { 647 throw new IOException(ie); 648 } 649 } 650 651 /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/ 652 @Deprecated 653 public TaskReport[] getMapTaskReports(String jobId) throws IOException { 654 return getMapTaskReports(JobID.forName(jobId)); 655 } 656 657 /** 658 * Get the information of the current state of the reduce tasks of a job. 659 * 660 * @param jobId the job to query. 661 * @return the list of all of the reduce tips. 662 * @throws IOException 663 */ 664 public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException { 665 return getTaskReports(jobId, TaskType.REDUCE); 666 } 667 668 /** 669 * Get the information of the current state of the cleanup tasks of a job. 670 * 671 * @param jobId the job to query. 672 * @return the list of all of the cleanup tips. 673 * @throws IOException 674 */ 675 public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException { 676 return getTaskReports(jobId, TaskType.JOB_CLEANUP); 677 } 678 679 /** 680 * Get the information of the current state of the setup tasks of a job. 681 * 682 * @param jobId the job to query. 683 * @return the list of all of the setup tips. 684 * @throws IOException 685 */ 686 public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException { 687 return getTaskReports(jobId, TaskType.JOB_SETUP); 688 } 689 690 691 /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/ 692 @Deprecated 693 public TaskReport[] getReduceTaskReports(String jobId) throws IOException { 694 return getReduceTaskReports(JobID.forName(jobId)); 695 } 696 697 /** 698 * Display the information about a job's tasks, of a particular type and 699 * in a particular state 700 * 701 * @param jobId the ID of the job 702 * @param type the type of the task (map/reduce/setup/cleanup) 703 * @param state the state of the task 704 * (pending/running/completed/failed/killed) 705 */ 706 public void displayTasks(final JobID jobId, String type, String state) 707 throws IOException { 708 try { 709 Job job = getJobUsingCluster(jobId); 710 super.displayTasks(job, type, state); 711 } catch (InterruptedException ie) { 712 throw new IOException(ie); 713 } 714 } 715 716 /** 717 * Get status information about the Map-Reduce cluster. 718 * 719 * @return the status information about the Map-Reduce cluster as an object 720 * of {@link ClusterStatus}. 721 * @throws IOException 722 */ 723 public ClusterStatus getClusterStatus() throws IOException { 724 try { 725 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() { 726 public ClusterStatus run() throws IOException, InterruptedException { 727 ClusterMetrics metrics = cluster.getClusterStatus(); 728 return new ClusterStatus(metrics.getTaskTrackerCount(), metrics 729 .getBlackListedTaskTrackerCount(), cluster 730 .getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(), 731 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), 732 metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(), 733 metrics.getDecommissionedTaskTrackerCount(), metrics 734 .getGrayListedTaskTrackerCount()); 735 } 736 }); 737 } catch (InterruptedException ie) { 738 throw new IOException(ie); 739 } 740 } 741 742 private Collection<String> arrayToStringList(TaskTrackerInfo[] objs) { 743 Collection<String> list = new ArrayList<String>(); 744 for (TaskTrackerInfo info: objs) { 745 list.add(info.getTaskTrackerName()); 746 } 747 return list; 748 } 749 750 private Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) { 751 Collection<BlackListInfo> list = new ArrayList<BlackListInfo>(); 752 for (TaskTrackerInfo info: objs) { 753 BlackListInfo binfo = new BlackListInfo(); 754 binfo.setTrackerName(info.getTaskTrackerName()); 755 binfo.setReasonForBlackListing(info.getReasonForBlacklist()); 756 binfo.setBlackListReport(info.getBlacklistReport()); 757 list.add(binfo); 758 } 759 return list; 760 } 761 762 /** 763 * Get status information about the Map-Reduce cluster. 764 * 765 * @param detailed if true then get a detailed status including the 766 * tracker names 767 * @return the status information about the Map-Reduce cluster as an object 768 * of {@link ClusterStatus}. 769 * @throws IOException 770 */ 771 public ClusterStatus getClusterStatus(boolean detailed) throws IOException { 772 try { 773 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() { 774 public ClusterStatus run() throws IOException, InterruptedException { 775 ClusterMetrics metrics = cluster.getClusterStatus(); 776 return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()), 777 arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()), 778 cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(), 779 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), 780 metrics.getReduceSlotCapacity(), 781 cluster.getJobTrackerStatus()); 782 } 783 }); 784 } catch (InterruptedException ie) { 785 throw new IOException(ie); 786 } 787 } 788 789 790 /** 791 * Get the jobs that are not completed and not failed. 792 * 793 * @return array of {@link JobStatus} for the running/to-be-run jobs. 794 * @throws IOException 795 */ 796 public JobStatus[] jobsToComplete() throws IOException { 797 List<JobStatus> stats = new ArrayList<JobStatus>(); 798 for (JobStatus stat : getAllJobs()) { 799 if (!stat.isJobComplete()) { 800 stats.add(stat); 801 } 802 } 803 return stats.toArray(new JobStatus[0]); 804 } 805 806 /** 807 * Get the jobs that are submitted. 808 * 809 * @return array of {@link JobStatus} for the submitted jobs. 810 * @throws IOException 811 */ 812 public JobStatus[] getAllJobs() throws IOException { 813 try { 814 org.apache.hadoop.mapreduce.JobStatus[] jobs = 815 clientUgi.doAs(new PrivilegedExceptionAction< 816 org.apache.hadoop.mapreduce.JobStatus[]> () { 817 public org.apache.hadoop.mapreduce.JobStatus[] run() 818 throws IOException, InterruptedException { 819 return cluster.getAllJobStatuses(); 820 } 821 }); 822 JobStatus[] stats = new JobStatus[jobs.length]; 823 for (int i = 0; i < jobs.length; i++) { 824 stats[i] = JobStatus.downgrade(jobs[i]); 825 } 826 return stats; 827 } catch (InterruptedException ie) { 828 throw new IOException(ie); 829 } 830 } 831 832 /** 833 * Utility that submits a job, then polls for progress until the job is 834 * complete. 835 * 836 * @param job the job configuration. 837 * @throws IOException if the job fails 838 */ 839 public static RunningJob runJob(JobConf job) throws IOException { 840 JobClient jc = new JobClient(job); 841 RunningJob rj = jc.submitJob(job); 842 try { 843 if (!jc.monitorAndPrintJob(job, rj)) { 844 throw new IOException("Job failed!"); 845 } 846 } catch (InterruptedException ie) { 847 Thread.currentThread().interrupt(); 848 } 849 return rj; 850 } 851 852 /** 853 * Monitor a job and print status in real-time as progress is made and tasks 854 * fail. 855 * @param conf the job's configuration 856 * @param job the job to track 857 * @return true if the job succeeded 858 * @throws IOException if communication to the JobTracker fails 859 */ 860 public boolean monitorAndPrintJob(JobConf conf, 861 RunningJob job 862 ) throws IOException, InterruptedException { 863 return ((NetworkedJob)job).monitorAndPrintJob(); 864 } 865 866 static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { 867 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 868 } 869 870 static Configuration getConfiguration(String jobTrackerSpec) 871 { 872 Configuration conf = new Configuration(); 873 if (jobTrackerSpec != null) { 874 if (jobTrackerSpec.indexOf(":") >= 0) { 875 conf.set("mapred.job.tracker", jobTrackerSpec); 876 } else { 877 String classpathFile = "hadoop-" + jobTrackerSpec + ".xml"; 878 URL validate = conf.getResource(classpathFile); 879 if (validate == null) { 880 throw new RuntimeException(classpathFile + " not found on CLASSPATH"); 881 } 882 conf.addResource(classpathFile); 883 } 884 } 885 return conf; 886 } 887 888 /** 889 * Sets the output filter for tasks. only those tasks are printed whose 890 * output matches the filter. 891 * @param newValue task filter. 892 */ 893 @Deprecated 894 public void setTaskOutputFilter(TaskStatusFilter newValue){ 895 this.taskOutputFilter = newValue; 896 } 897 898 /** 899 * Get the task output filter out of the JobConf. 900 * 901 * @param job the JobConf to examine. 902 * @return the filter level. 903 */ 904 public static TaskStatusFilter getTaskOutputFilter(JobConf job) { 905 return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 906 "FAILED")); 907 } 908 909 /** 910 * Modify the JobConf to set the task output filter. 911 * 912 * @param job the JobConf to modify. 913 * @param newValue the value to set. 914 */ 915 public static void setTaskOutputFilter(JobConf job, 916 TaskStatusFilter newValue) { 917 job.set("jobclient.output.filter", newValue.toString()); 918 } 919 920 /** 921 * Returns task output filter. 922 * @return task filter. 923 */ 924 @Deprecated 925 public TaskStatusFilter getTaskOutputFilter(){ 926 return this.taskOutputFilter; 927 } 928 929 protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs, 930 String counterGroupName, String counterName) throws IOException { 931 Counters counters = Counters.downgrade(cntrs); 932 return counters.findCounter(counterGroupName, counterName).getValue(); 933 } 934 935 /** 936 * Get status information about the max available Maps in the cluster. 937 * 938 * @return the max available Maps in the cluster 939 * @throws IOException 940 */ 941 public int getDefaultMaps() throws IOException { 942 try { 943 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() { 944 @Override 945 public Integer run() throws IOException, InterruptedException { 946 return cluster.getClusterStatus().getMapSlotCapacity(); 947 } 948 }); 949 } catch (InterruptedException ie) { 950 throw new IOException(ie); 951 } 952 } 953 954 /** 955 * Get status information about the max available Reduces in the cluster. 956 * 957 * @return the max available Reduces in the cluster 958 * @throws IOException 959 */ 960 public int getDefaultReduces() throws IOException { 961 try { 962 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() { 963 @Override 964 public Integer run() throws IOException, InterruptedException { 965 return cluster.getClusterStatus().getReduceSlotCapacity(); 966 } 967 }); 968 } catch (InterruptedException ie) { 969 throw new IOException(ie); 970 } 971 } 972 973 /** 974 * Grab the jobtracker system directory path where job-specific files are to be placed. 975 * 976 * @return the system directory where job-specific files are to be placed. 977 */ 978 public Path getSystemDir() { 979 try { 980 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() { 981 @Override 982 public Path run() throws IOException, InterruptedException { 983 return cluster.getSystemDir(); 984 } 985 }); 986 } catch (IOException ioe) { 987 return null; 988 } catch (InterruptedException ie) { 989 return null; 990 } 991 } 992 993 /** 994 * Checks if the job directory is clean and has all the required components 995 * for (re) starting the job 996 */ 997 public static boolean isJobDirValid(Path jobDirPath, FileSystem fs) 998 throws IOException { 999 FileStatus[] contents = fs.listStatus(jobDirPath); 1000 int matchCount = 0; 1001 if (contents != null && contents.length >= 2) { 1002 for (FileStatus status : contents) { 1003 if ("job.xml".equals(status.getPath().getName())) { 1004 ++matchCount; 1005 } 1006 if ("job.split".equals(status.getPath().getName())) { 1007 ++matchCount; 1008 } 1009 } 1010 if (matchCount == 2) { 1011 return true; 1012 } 1013 } 1014 return false; 1015 } 1016 1017 /** 1018 * Fetch the staging area directory for the application 1019 * 1020 * @return path to staging area directory 1021 * @throws IOException 1022 */ 1023 public Path getStagingAreaDir() throws IOException { 1024 try { 1025 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() { 1026 @Override 1027 public Path run() throws IOException, InterruptedException { 1028 return cluster.getStagingAreaDir(); 1029 } 1030 }); 1031 } catch (InterruptedException ie) { 1032 // throw RuntimeException instead for compatibility reasons 1033 throw new RuntimeException(ie); 1034 } 1035 } 1036 1037 private JobQueueInfo getJobQueueInfo(QueueInfo queue) { 1038 JobQueueInfo ret = new JobQueueInfo(queue); 1039 // make sure to convert any children 1040 if (queue.getQueueChildren().size() > 0) { 1041 List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue 1042 .getQueueChildren().size()); 1043 for (QueueInfo child : queue.getQueueChildren()) { 1044 childQueues.add(getJobQueueInfo(child)); 1045 } 1046 ret.setChildren(childQueues); 1047 } 1048 return ret; 1049 } 1050 1051 private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues) 1052 throws IOException { 1053 JobQueueInfo[] ret = new JobQueueInfo[queues.length]; 1054 for (int i = 0; i < queues.length; i++) { 1055 ret[i] = getJobQueueInfo(queues[i]); 1056 } 1057 return ret; 1058 } 1059 1060 /** 1061 * Returns an array of queue information objects about root level queues 1062 * configured 1063 * 1064 * @return the array of root level JobQueueInfo objects 1065 * @throws IOException 1066 */ 1067 public JobQueueInfo[] getRootQueues() throws IOException { 1068 try { 1069 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1070 public JobQueueInfo[] run() throws IOException, InterruptedException { 1071 return getJobQueueInfoArray(cluster.getRootQueues()); 1072 } 1073 }); 1074 } catch (InterruptedException ie) { 1075 throw new IOException(ie); 1076 } 1077 } 1078 1079 /** 1080 * Returns an array of queue information objects about immediate children 1081 * of queue queueName. 1082 * 1083 * @param queueName 1084 * @return the array of immediate children JobQueueInfo objects 1085 * @throws IOException 1086 */ 1087 public JobQueueInfo[] getChildQueues(final String queueName) throws IOException { 1088 try { 1089 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1090 public JobQueueInfo[] run() throws IOException, InterruptedException { 1091 return getJobQueueInfoArray(cluster.getChildQueues(queueName)); 1092 } 1093 }); 1094 } catch (InterruptedException ie) { 1095 throw new IOException(ie); 1096 } 1097 } 1098 1099 /** 1100 * Return an array of queue information objects about all the Job Queues 1101 * configured. 1102 * 1103 * @return Array of JobQueueInfo objects 1104 * @throws IOException 1105 */ 1106 public JobQueueInfo[] getQueues() throws IOException { 1107 try { 1108 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1109 public JobQueueInfo[] run() throws IOException, InterruptedException { 1110 return getJobQueueInfoArray(cluster.getQueues()); 1111 } 1112 }); 1113 } catch (InterruptedException ie) { 1114 throw new IOException(ie); 1115 } 1116 } 1117 1118 /** 1119 * Gets all the jobs which were added to particular Job Queue 1120 * 1121 * @param queueName name of the Job Queue 1122 * @return Array of jobs present in the job queue 1123 * @throws IOException 1124 */ 1125 1126 public JobStatus[] getJobsFromQueue(final String queueName) throws IOException { 1127 try { 1128 QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() { 1129 @Override 1130 public QueueInfo run() throws IOException, InterruptedException { 1131 return cluster.getQueue(queueName); 1132 } 1133 }); 1134 if (queue == null) { 1135 return null; 1136 } 1137 org.apache.hadoop.mapreduce.JobStatus[] stats = 1138 queue.getJobStatuses(); 1139 JobStatus[] ret = new JobStatus[stats.length]; 1140 for (int i = 0 ; i < stats.length; i++ ) { 1141 ret[i] = JobStatus.downgrade(stats[i]); 1142 } 1143 return ret; 1144 } catch (InterruptedException ie) { 1145 throw new IOException(ie); 1146 } 1147 } 1148 1149 /** 1150 * Gets the queue information associated to a particular Job Queue 1151 * 1152 * @param queueName name of the job queue. 1153 * @return Queue information associated to particular queue. 1154 * @throws IOException 1155 */ 1156 public JobQueueInfo getQueueInfo(final String queueName) throws IOException { 1157 try { 1158 QueueInfo queueInfo = clientUgi.doAs(new 1159 PrivilegedExceptionAction<QueueInfo>() { 1160 public QueueInfo run() throws IOException, InterruptedException { 1161 return cluster.getQueue(queueName); 1162 } 1163 }); 1164 if (queueInfo != null) { 1165 return new JobQueueInfo(queueInfo); 1166 } 1167 return null; 1168 } catch (InterruptedException ie) { 1169 throw new IOException(ie); 1170 } 1171 } 1172 1173 /** 1174 * Gets the Queue ACLs for current user 1175 * @return array of QueueAclsInfo object for current user. 1176 * @throws IOException 1177 */ 1178 public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException { 1179 try { 1180 org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 1181 clientUgi.doAs(new 1182 PrivilegedExceptionAction 1183 <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() { 1184 public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 1185 throws IOException, InterruptedException { 1186 return cluster.getQueueAclsForCurrentUser(); 1187 } 1188 }); 1189 QueueAclsInfo[] ret = new QueueAclsInfo[acls.length]; 1190 for (int i = 0 ; i < acls.length; i++ ) { 1191 ret[i] = QueueAclsInfo.downgrade(acls[i]); 1192 } 1193 return ret; 1194 } catch (InterruptedException ie) { 1195 throw new IOException(ie); 1196 } 1197 } 1198 1199 /** 1200 * Get a delegation token for the user from the JobTracker. 1201 * @param renewer the user who can renew the token 1202 * @return the new token 1203 * @throws IOException 1204 */ 1205 public Token<DelegationTokenIdentifier> 1206 getDelegationToken(final Text renewer) throws IOException, InterruptedException { 1207 return clientUgi.doAs(new 1208 PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() { 1209 public Token<DelegationTokenIdentifier> run() throws IOException, 1210 InterruptedException { 1211 return cluster.getDelegationToken(renewer); 1212 } 1213 }); 1214 } 1215 1216 /** 1217 * Renew a delegation token 1218 * @param token the token to renew 1219 * @return true if the renewal went well 1220 * @throws InvalidToken 1221 * @throws IOException 1222 * @deprecated Use {@link Token#renew} instead 1223 */ 1224 public long renewDelegationToken(Token<DelegationTokenIdentifier> token 1225 ) throws InvalidToken, IOException, 1226 InterruptedException { 1227 return token.renew(getConf()); 1228 } 1229 1230 /** 1231 * Cancel a delegation token from the JobTracker 1232 * @param token the token to cancel 1233 * @throws IOException 1234 * @deprecated Use {@link Token#cancel} instead 1235 */ 1236 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token 1237 ) throws InvalidToken, IOException, 1238 InterruptedException { 1239 token.cancel(getConf()); 1240 } 1241 1242 /** 1243 */ 1244 public static void main(String argv[]) throws Exception { 1245 int res = ToolRunner.run(new JobClient(), argv); 1246 System.exit(res); 1247 } 1248} 1249