001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapred; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.net.InetSocketAddress; 023import java.net.URL; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.List; 028 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.io.Text; 036import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo; 037import org.apache.hadoop.mapreduce.Cluster; 038import org.apache.hadoop.mapreduce.ClusterMetrics; 039import org.apache.hadoop.mapreduce.Job; 040import org.apache.hadoop.mapreduce.QueueInfo; 041import org.apache.hadoop.mapreduce.TaskTrackerInfo; 042import org.apache.hadoop.mapreduce.TaskType; 043import org.apache.hadoop.mapreduce.filecache.DistributedCache; 044import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 045import org.apache.hadoop.mapreduce.tools.CLI; 046import org.apache.hadoop.mapreduce.util.ConfigUtil; 047import org.apache.hadoop.security.UserGroupInformation; 048import org.apache.hadoop.security.token.SecretManager.InvalidToken; 049import org.apache.hadoop.security.token.Token; 050import org.apache.hadoop.security.token.TokenRenewer; 051import org.apache.hadoop.util.Tool; 052import org.apache.hadoop.util.ToolRunner; 053 054/** 055 * <code>JobClient</code> is the primary interface for the user-job to interact 056 * with the cluster. 057 * 058 * <code>JobClient</code> provides facilities to submit jobs, track their 059 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster 060 * status information etc. 061 * 062 * <p>The job submission process involves: 063 * <ol> 064 * <li> 065 * Checking the input and output specifications of the job. 066 * </li> 067 * <li> 068 * Computing the {@link InputSplit}s for the job. 069 * </li> 070 * <li> 071 * Setup the requisite accounting information for the {@link DistributedCache} 072 * of the job, if necessary. 073 * </li> 074 * <li> 075 * Copying the job's jar and configuration to the map-reduce system directory 076 * on the distributed file-system. 077 * </li> 078 * <li> 079 * Submitting the job to the cluster and optionally monitoring 080 * it's status. 081 * </li> 082 * </ol> 083 * 084 * Normally the user creates the application, describes various facets of the 085 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 086 * the job and monitor its progress. 087 * 088 * <p>Here is an example on how to use <code>JobClient</code>:</p> 089 * <p><blockquote><pre> 090 * // Create a new JobConf 091 * JobConf job = new JobConf(new Configuration(), MyJob.class); 092 * 093 * // Specify various job-specific parameters 094 * job.setJobName("myjob"); 095 * 096 * job.setInputPath(new Path("in")); 097 * job.setOutputPath(new Path("out")); 098 * 099 * job.setMapperClass(MyJob.MyMapper.class); 100 * job.setReducerClass(MyJob.MyReducer.class); 101 * 102 * // Submit the job, then poll for progress until the job is complete 103 * JobClient.runJob(job); 104 * </pre></blockquote> 105 * 106 * <b id="JobControl">Job Control</b> 107 * 108 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 109 * which cannot be done via a single map-reduce job. This is fairly easy since 110 * the output of the job, typically, goes to distributed file-system and that 111 * can be used as the input for the next job.</p> 112 * 113 * <p>However, this also means that the onus on ensuring jobs are complete 114 * (success/failure) lies squarely on the clients. In such situations the 115 * various job-control options are: 116 * <ol> 117 * <li> 118 * {@link #runJob(JobConf)} : submits the job and returns only after 119 * the job has completed. 120 * </li> 121 * <li> 122 * {@link #submitJob(JobConf)} : only submits the job, then poll the 123 * returned handle to the {@link RunningJob} to query status and make 124 * scheduling decisions. 125 * </li> 126 * <li> 127 * {@link JobConf#setJobEndNotificationURI(String)} : setup a notification 128 * on job-completion, thus avoiding polling. 129 * </li> 130 * </ol> 131 * 132 * @see JobConf 133 * @see ClusterStatus 134 * @see Tool 135 * @see DistributedCache 136 */ 137@InterfaceAudience.Public 138@InterfaceStability.Stable 139public class JobClient extends CLI { 140 141 @InterfaceAudience.Private 142 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY = 143 "mapreduce.jobclient.retry.policy.enabled"; 144 @InterfaceAudience.Private 145 public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = 146 false; 147 @InterfaceAudience.Private 148 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY = 149 "mapreduce.jobclient.retry.policy.spec"; 150 @InterfaceAudience.Private 151 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT = 152 "10000,6,60000,10"; // t1,n1,t2,n2,... 153 154 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 155 private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 156 157 static{ 158 ConfigUtil.loadResources(); 159 } 160 161 /** 162 * A NetworkedJob is an implementation of RunningJob. It holds 163 * a JobProfile object to provide some info, and interacts with the 164 * remote service to provide certain functionality. 165 */ 166 static class NetworkedJob implements RunningJob { 167 Job job; 168 /** 169 * We store a JobProfile and a timestamp for when we last 170 * acquired the job profile. If the job is null, then we cannot 171 * perform any of the tasks. The job might be null if the cluster 172 * has completely forgotten about the job. (eg, 24 hours after the 173 * job completes.) 174 */ 175 public NetworkedJob(JobStatus status, Cluster cluster) throws IOException { 176 this(status, cluster, new JobConf(status.getJobFile())); 177 } 178 179 private NetworkedJob(JobStatus status, Cluster cluster, JobConf conf) 180 throws IOException { 181 this(Job.getInstance(cluster, status, conf)); 182 } 183 184 public NetworkedJob(Job job) throws IOException { 185 this.job = job; 186 } 187 188 public Configuration getConfiguration() { 189 return job.getConfiguration(); 190 } 191 192 /** 193 * An identifier for the job 194 */ 195 public JobID getID() { 196 return JobID.downgrade(job.getJobID()); 197 } 198 199 /** @deprecated This method is deprecated and will be removed. Applications should 200 * rather use {@link #getID()}.*/ 201 @Deprecated 202 public String getJobID() { 203 return getID().toString(); 204 } 205 206 /** 207 * The user-specified job name 208 */ 209 public String getJobName() { 210 return job.getJobName(); 211 } 212 213 /** 214 * The name of the job file 215 */ 216 public String getJobFile() { 217 return job.getJobFile(); 218 } 219 220 /** 221 * A URL where the job's status can be seen 222 */ 223 public String getTrackingURL() { 224 return job.getTrackingURL(); 225 } 226 227 /** 228 * A float between 0.0 and 1.0, indicating the % of map work 229 * completed. 230 */ 231 public float mapProgress() throws IOException { 232 return job.mapProgress(); 233 } 234 235 /** 236 * A float between 0.0 and 1.0, indicating the % of reduce work 237 * completed. 238 */ 239 public float reduceProgress() throws IOException { 240 return job.reduceProgress(); 241 } 242 243 /** 244 * A float between 0.0 and 1.0, indicating the % of cleanup work 245 * completed. 246 */ 247 public float cleanupProgress() throws IOException { 248 try { 249 return job.cleanupProgress(); 250 } catch (InterruptedException ie) { 251 throw new IOException(ie); 252 } 253 } 254 255 /** 256 * A float between 0.0 and 1.0, indicating the % of setup work 257 * completed. 258 */ 259 public float setupProgress() throws IOException { 260 return job.setupProgress(); 261 } 262 263 /** 264 * Returns immediately whether the whole job is done yet or not. 265 */ 266 public synchronized boolean isComplete() throws IOException { 267 return job.isComplete(); 268 } 269 270 /** 271 * True iff job completed successfully. 272 */ 273 public synchronized boolean isSuccessful() throws IOException { 274 return job.isSuccessful(); 275 } 276 277 /** 278 * Blocks until the job is finished 279 */ 280 public void waitForCompletion() throws IOException { 281 try { 282 job.waitForCompletion(false); 283 } catch (InterruptedException ie) { 284 throw new IOException(ie); 285 } catch (ClassNotFoundException ce) { 286 throw new IOException(ce); 287 } 288 } 289 290 /** 291 * Tells the service to get the state of the current job. 292 */ 293 public synchronized int getJobState() throws IOException { 294 try { 295 return job.getJobState().getValue(); 296 } catch (InterruptedException ie) { 297 throw new IOException(ie); 298 } 299 } 300 301 /** 302 * Tells the service to terminate the current job. 303 */ 304 public synchronized void killJob() throws IOException { 305 job.killJob(); 306 } 307 308 309 /** Set the priority of the job. 310 * @param priority new priority of the job. 311 */ 312 public synchronized void setJobPriority(String priority) 313 throws IOException { 314 try { 315 job.setPriority( 316 org.apache.hadoop.mapreduce.JobPriority.valueOf(priority)); 317 } catch (InterruptedException ie) { 318 throw new IOException(ie); 319 } 320 } 321 322 /** 323 * Kill indicated task attempt. 324 * @param taskId the id of the task to kill. 325 * @param shouldFail if true the task is failed and added to failed tasks list, otherwise 326 * it is just killed, w/o affecting job failure status. 327 */ 328 public synchronized void killTask(TaskAttemptID taskId, 329 boolean shouldFail) throws IOException { 330 if (shouldFail) { 331 job.failTask(taskId); 332 } else { 333 job.killTask(taskId); 334 } 335 } 336 337 /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/ 338 @Deprecated 339 public synchronized void killTask(String taskId, boolean shouldFail) throws IOException { 340 killTask(TaskAttemptID.forName(taskId), shouldFail); 341 } 342 343 /** 344 * Fetch task completion events from cluster for this job. 345 */ 346 public synchronized TaskCompletionEvent[] getTaskCompletionEvents( 347 int startFrom) throws IOException { 348 try { 349 org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 350 job.getTaskCompletionEvents(startFrom, 10); 351 TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length]; 352 for (int i = 0 ; i < acls.length; i++ ) { 353 ret[i] = TaskCompletionEvent.downgrade(acls[i]); 354 } 355 return ret; 356 } catch (InterruptedException ie) { 357 throw new IOException(ie); 358 } 359 } 360 361 /** 362 * Dump stats to screen 363 */ 364 @Override 365 public String toString() { 366 return job.toString(); 367 } 368 369 /** 370 * Returns the counters for this job 371 */ 372 public Counters getCounters() throws IOException { 373 Counters result = null; 374 org.apache.hadoop.mapreduce.Counters temp = job.getCounters(); 375 if(temp != null) { 376 result = Counters.downgrade(temp); 377 } 378 return result; 379 } 380 381 @Override 382 public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException { 383 try { 384 return job.getTaskDiagnostics(id); 385 } catch (InterruptedException ie) { 386 throw new IOException(ie); 387 } 388 } 389 390 public String getHistoryUrl() throws IOException { 391 try { 392 return job.getHistoryUrl(); 393 } catch (InterruptedException ie) { 394 throw new IOException(ie); 395 } 396 } 397 398 public boolean isRetired() throws IOException { 399 try { 400 return job.isRetired(); 401 } catch (InterruptedException ie) { 402 throw new IOException(ie); 403 } 404 } 405 406 boolean monitorAndPrintJob() throws IOException, InterruptedException { 407 return job.monitorAndPrintJob(); 408 } 409 410 @Override 411 public String getFailureInfo() throws IOException { 412 try { 413 return job.getStatus().getFailureInfo(); 414 } catch (InterruptedException ie) { 415 throw new IOException(ie); 416 } 417 } 418 419 @Override 420 public JobStatus getJobStatus() throws IOException { 421 try { 422 return JobStatus.downgrade(job.getStatus()); 423 } catch (InterruptedException ie) { 424 throw new IOException(ie); 425 } 426 } 427 } 428 429 /** 430 * Ugi of the client. We store this ugi when the client is created and 431 * then make sure that the same ugi is used to run the various protocols. 432 */ 433 UserGroupInformation clientUgi; 434 435 /** 436 * Create a job client. 437 */ 438 public JobClient() { 439 } 440 441 /** 442 * Build a job client with the given {@link JobConf}, and connect to the 443 * default cluster 444 * 445 * @param conf the job configuration. 446 * @throws IOException 447 */ 448 public JobClient(JobConf conf) throws IOException { 449 init(conf); 450 } 451 452 /** 453 * Build a job client with the given {@link Configuration}, 454 * and connect to the default cluster 455 * 456 * @param conf the configuration. 457 * @throws IOException 458 */ 459 public JobClient(Configuration conf) throws IOException { 460 init(new JobConf(conf)); 461 } 462 463 /** 464 * Connect to the default cluster 465 * @param conf the job configuration. 466 * @throws IOException 467 */ 468 public void init(JobConf conf) throws IOException { 469 setConf(conf); 470 cluster = new Cluster(conf); 471 clientUgi = UserGroupInformation.getCurrentUser(); 472 } 473 474 /** 475 * Build a job client, connect to the indicated job tracker. 476 * 477 * @param jobTrackAddr the job tracker to connect to. 478 * @param conf configuration. 479 */ 480 public JobClient(InetSocketAddress jobTrackAddr, 481 Configuration conf) throws IOException { 482 cluster = new Cluster(jobTrackAddr, conf); 483 clientUgi = UserGroupInformation.getCurrentUser(); 484 } 485 486 /** 487 * Close the <code>JobClient</code>. 488 */ 489 public synchronized void close() throws IOException { 490 cluster.close(); 491 } 492 493 /** 494 * Get a filesystem handle. We need this to prepare jobs 495 * for submission to the MapReduce system. 496 * 497 * @return the filesystem handle. 498 */ 499 public synchronized FileSystem getFs() throws IOException { 500 try { 501 return cluster.getFileSystem(); 502 } catch (InterruptedException ie) { 503 throw new IOException(ie); 504 } 505 } 506 507 /** 508 * Get a handle to the Cluster 509 */ 510 public Cluster getClusterHandle() { 511 return cluster; 512 } 513 514 /** 515 * Submit a job to the MR system. 516 * 517 * This returns a handle to the {@link RunningJob} which can be used to track 518 * the running-job. 519 * 520 * @param jobFile the job configuration. 521 * @return a handle to the {@link RunningJob} which can be used to track the 522 * running-job. 523 * @throws FileNotFoundException 524 * @throws InvalidJobConfException 525 * @throws IOException 526 */ 527 public RunningJob submitJob(String jobFile) throws FileNotFoundException, 528 InvalidJobConfException, 529 IOException { 530 // Load in the submitted job details 531 JobConf job = new JobConf(jobFile); 532 return submitJob(job); 533 } 534 535 /** 536 * Submit a job to the MR system. 537 * This returns a handle to the {@link RunningJob} which can be used to track 538 * the running-job. 539 * 540 * @param conf the job configuration. 541 * @return a handle to the {@link RunningJob} which can be used to track the 542 * running-job. 543 * @throws FileNotFoundException 544 * @throws IOException 545 */ 546 public RunningJob submitJob(final JobConf conf) throws FileNotFoundException, 547 IOException { 548 return submitJobInternal(conf); 549 } 550 551 @InterfaceAudience.Private 552 public RunningJob submitJobInternal(final JobConf conf) 553 throws FileNotFoundException, IOException { 554 try { 555 conf.setBooleanIfUnset("mapred.mapper.new-api", false); 556 conf.setBooleanIfUnset("mapred.reducer.new-api", false); 557 Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () { 558 @Override 559 public Job run() throws IOException, ClassNotFoundException, 560 InterruptedException { 561 Job job = Job.getInstance(conf); 562 job.submit(); 563 return job; 564 } 565 }); 566 // update our Cluster instance with the one created by Job for submission 567 // (we can't pass our Cluster instance to Job, since Job wraps the config 568 // instance, and the two configs would then diverge) 569 cluster = job.getCluster(); 570 return new NetworkedJob(job); 571 } catch (InterruptedException ie) { 572 throw new IOException("interrupted", ie); 573 } 574 } 575 576 private Job getJobUsingCluster(final JobID jobid) throws IOException, 577 InterruptedException { 578 return clientUgi.doAs(new PrivilegedExceptionAction<Job>() { 579 public Job run() throws IOException, InterruptedException { 580 return cluster.getJob(jobid); 581 } 582 }); 583 } 584 /** 585 * Get an {@link RunningJob} object to track an ongoing job. Returns 586 * null if the id does not correspond to any known job. 587 * 588 * @param jobid the jobid of the job. 589 * @return the {@link RunningJob} handle to track the job, null if the 590 * <code>jobid</code> doesn't correspond to any known job. 591 * @throws IOException 592 */ 593 public RunningJob getJob(final JobID jobid) throws IOException { 594 try { 595 596 Job job = getJobUsingCluster(jobid); 597 if (job != null) { 598 JobStatus status = JobStatus.downgrade(job.getStatus()); 599 if (status != null) { 600 return new NetworkedJob(status, cluster, 601 new JobConf(job.getConfiguration())); 602 } 603 } 604 } catch (InterruptedException ie) { 605 throw new IOException(ie); 606 } 607 return null; 608 } 609 610 /**@deprecated Applications should rather use {@link #getJob(JobID)}. 611 */ 612 @Deprecated 613 public RunningJob getJob(String jobid) throws IOException { 614 return getJob(JobID.forName(jobid)); 615 } 616 617 private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0]; 618 619 /** 620 * Get the information of the current state of the map tasks of a job. 621 * 622 * @param jobId the job to query. 623 * @return the list of all of the map tips. 624 * @throws IOException 625 */ 626 public TaskReport[] getMapTaskReports(JobID jobId) throws IOException { 627 return getTaskReports(jobId, TaskType.MAP); 628 } 629 630 private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 631 IOException { 632 try { 633 Job j = getJobUsingCluster(jobId); 634 if(j == null) { 635 return EMPTY_TASK_REPORTS; 636 } 637 return TaskReport.downgradeArray(j.getTaskReports(type)); 638 } catch (InterruptedException ie) { 639 throw new IOException(ie); 640 } 641 } 642 643 /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/ 644 @Deprecated 645 public TaskReport[] getMapTaskReports(String jobId) throws IOException { 646 return getMapTaskReports(JobID.forName(jobId)); 647 } 648 649 /** 650 * Get the information of the current state of the reduce tasks of a job. 651 * 652 * @param jobId the job to query. 653 * @return the list of all of the reduce tips. 654 * @throws IOException 655 */ 656 public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException { 657 return getTaskReports(jobId, TaskType.REDUCE); 658 } 659 660 /** 661 * Get the information of the current state of the cleanup tasks of a job. 662 * 663 * @param jobId the job to query. 664 * @return the list of all of the cleanup tips. 665 * @throws IOException 666 */ 667 public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException { 668 return getTaskReports(jobId, TaskType.JOB_CLEANUP); 669 } 670 671 /** 672 * Get the information of the current state of the setup tasks of a job. 673 * 674 * @param jobId the job to query. 675 * @return the list of all of the setup tips. 676 * @throws IOException 677 */ 678 public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException { 679 return getTaskReports(jobId, TaskType.JOB_SETUP); 680 } 681 682 683 /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/ 684 @Deprecated 685 public TaskReport[] getReduceTaskReports(String jobId) throws IOException { 686 return getReduceTaskReports(JobID.forName(jobId)); 687 } 688 689 /** 690 * Display the information about a job's tasks, of a particular type and 691 * in a particular state 692 * 693 * @param jobId the ID of the job 694 * @param type the type of the task (map/reduce/setup/cleanup) 695 * @param state the state of the task 696 * (pending/running/completed/failed/killed) 697 */ 698 public void displayTasks(final JobID jobId, String type, String state) 699 throws IOException { 700 try { 701 Job job = getJobUsingCluster(jobId); 702 super.displayTasks(job, type, state); 703 } catch (InterruptedException ie) { 704 throw new IOException(ie); 705 } 706 } 707 708 /** 709 * Get status information about the Map-Reduce cluster. 710 * 711 * @return the status information about the Map-Reduce cluster as an object 712 * of {@link ClusterStatus}. 713 * @throws IOException 714 */ 715 public ClusterStatus getClusterStatus() throws IOException { 716 try { 717 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() { 718 public ClusterStatus run() throws IOException, InterruptedException { 719 ClusterMetrics metrics = cluster.getClusterStatus(); 720 return new ClusterStatus(metrics.getTaskTrackerCount(), metrics 721 .getBlackListedTaskTrackerCount(), cluster 722 .getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(), 723 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), 724 metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(), 725 metrics.getDecommissionedTaskTrackerCount(), metrics 726 .getGrayListedTaskTrackerCount()); 727 } 728 }); 729 } catch (InterruptedException ie) { 730 throw new IOException(ie); 731 } 732 } 733 734 private Collection<String> arrayToStringList(TaskTrackerInfo[] objs) { 735 Collection<String> list = new ArrayList<String>(); 736 for (TaskTrackerInfo info: objs) { 737 list.add(info.getTaskTrackerName()); 738 } 739 return list; 740 } 741 742 private Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) { 743 Collection<BlackListInfo> list = new ArrayList<BlackListInfo>(); 744 for (TaskTrackerInfo info: objs) { 745 BlackListInfo binfo = new BlackListInfo(); 746 binfo.setTrackerName(info.getTaskTrackerName()); 747 binfo.setReasonForBlackListing(info.getReasonForBlacklist()); 748 binfo.setBlackListReport(info.getBlacklistReport()); 749 list.add(binfo); 750 } 751 return list; 752 } 753 754 /** 755 * Get status information about the Map-Reduce cluster. 756 * 757 * @param detailed if true then get a detailed status including the 758 * tracker names 759 * @return the status information about the Map-Reduce cluster as an object 760 * of {@link ClusterStatus}. 761 * @throws IOException 762 */ 763 public ClusterStatus getClusterStatus(boolean detailed) throws IOException { 764 try { 765 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() { 766 public ClusterStatus run() throws IOException, InterruptedException { 767 ClusterMetrics metrics = cluster.getClusterStatus(); 768 return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()), 769 arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()), 770 cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(), 771 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), 772 metrics.getReduceSlotCapacity(), 773 cluster.getJobTrackerStatus()); 774 } 775 }); 776 } catch (InterruptedException ie) { 777 throw new IOException(ie); 778 } 779 } 780 781 782 /** 783 * Get the jobs that are not completed and not failed. 784 * 785 * @return array of {@link JobStatus} for the running/to-be-run jobs. 786 * @throws IOException 787 */ 788 public JobStatus[] jobsToComplete() throws IOException { 789 List<JobStatus> stats = new ArrayList<JobStatus>(); 790 for (JobStatus stat : getAllJobs()) { 791 if (!stat.isJobComplete()) { 792 stats.add(stat); 793 } 794 } 795 return stats.toArray(new JobStatus[0]); 796 } 797 798 /** 799 * Get the jobs that are submitted. 800 * 801 * @return array of {@link JobStatus} for the submitted jobs. 802 * @throws IOException 803 */ 804 public JobStatus[] getAllJobs() throws IOException { 805 try { 806 org.apache.hadoop.mapreduce.JobStatus[] jobs = 807 clientUgi.doAs(new PrivilegedExceptionAction< 808 org.apache.hadoop.mapreduce.JobStatus[]> () { 809 public org.apache.hadoop.mapreduce.JobStatus[] run() 810 throws IOException, InterruptedException { 811 return cluster.getAllJobStatuses(); 812 } 813 }); 814 JobStatus[] stats = new JobStatus[jobs.length]; 815 for (int i = 0; i < jobs.length; i++) { 816 stats[i] = JobStatus.downgrade(jobs[i]); 817 } 818 return stats; 819 } catch (InterruptedException ie) { 820 throw new IOException(ie); 821 } 822 } 823 824 /** 825 * Utility that submits a job, then polls for progress until the job is 826 * complete. 827 * 828 * @param job the job configuration. 829 * @throws IOException if the job fails 830 */ 831 public static RunningJob runJob(JobConf job) throws IOException { 832 JobClient jc = new JobClient(job); 833 RunningJob rj = jc.submitJob(job); 834 try { 835 if (!jc.monitorAndPrintJob(job, rj)) { 836 throw new IOException("Job failed!"); 837 } 838 } catch (InterruptedException ie) { 839 Thread.currentThread().interrupt(); 840 } 841 return rj; 842 } 843 844 /** 845 * Monitor a job and print status in real-time as progress is made and tasks 846 * fail. 847 * @param conf the job's configuration 848 * @param job the job to track 849 * @return true if the job succeeded 850 * @throws IOException if communication to the JobTracker fails 851 */ 852 public boolean monitorAndPrintJob(JobConf conf, 853 RunningJob job 854 ) throws IOException, InterruptedException { 855 return ((NetworkedJob)job).monitorAndPrintJob(); 856 } 857 858 static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { 859 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 860 } 861 862 static Configuration getConfiguration(String jobTrackerSpec) 863 { 864 Configuration conf = new Configuration(); 865 if (jobTrackerSpec != null) { 866 if (jobTrackerSpec.indexOf(":") >= 0) { 867 conf.set("mapred.job.tracker", jobTrackerSpec); 868 } else { 869 String classpathFile = "hadoop-" + jobTrackerSpec + ".xml"; 870 URL validate = conf.getResource(classpathFile); 871 if (validate == null) { 872 throw new RuntimeException(classpathFile + " not found on CLASSPATH"); 873 } 874 conf.addResource(classpathFile); 875 } 876 } 877 return conf; 878 } 879 880 /** 881 * Sets the output filter for tasks. only those tasks are printed whose 882 * output matches the filter. 883 * @param newValue task filter. 884 */ 885 @Deprecated 886 public void setTaskOutputFilter(TaskStatusFilter newValue){ 887 this.taskOutputFilter = newValue; 888 } 889 890 /** 891 * Get the task output filter out of the JobConf. 892 * 893 * @param job the JobConf to examine. 894 * @return the filter level. 895 */ 896 public static TaskStatusFilter getTaskOutputFilter(JobConf job) { 897 return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 898 "FAILED")); 899 } 900 901 /** 902 * Modify the JobConf to set the task output filter. 903 * 904 * @param job the JobConf to modify. 905 * @param newValue the value to set. 906 */ 907 public static void setTaskOutputFilter(JobConf job, 908 TaskStatusFilter newValue) { 909 job.set("jobclient.output.filter", newValue.toString()); 910 } 911 912 /** 913 * Returns task output filter. 914 * @return task filter. 915 */ 916 @Deprecated 917 public TaskStatusFilter getTaskOutputFilter(){ 918 return this.taskOutputFilter; 919 } 920 921 protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs, 922 String counterGroupName, String counterName) throws IOException { 923 Counters counters = Counters.downgrade(cntrs); 924 return counters.findCounter(counterGroupName, counterName).getValue(); 925 } 926 927 /** 928 * Get status information about the max available Maps in the cluster. 929 * 930 * @return the max available Maps in the cluster 931 * @throws IOException 932 */ 933 public int getDefaultMaps() throws IOException { 934 try { 935 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() { 936 @Override 937 public Integer run() throws IOException, InterruptedException { 938 return cluster.getClusterStatus().getMapSlotCapacity(); 939 } 940 }); 941 } catch (InterruptedException ie) { 942 throw new IOException(ie); 943 } 944 } 945 946 /** 947 * Get status information about the max available Reduces in the cluster. 948 * 949 * @return the max available Reduces in the cluster 950 * @throws IOException 951 */ 952 public int getDefaultReduces() throws IOException { 953 try { 954 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() { 955 @Override 956 public Integer run() throws IOException, InterruptedException { 957 return cluster.getClusterStatus().getReduceSlotCapacity(); 958 } 959 }); 960 } catch (InterruptedException ie) { 961 throw new IOException(ie); 962 } 963 } 964 965 /** 966 * Grab the jobtracker system directory path where job-specific files are to be placed. 967 * 968 * @return the system directory where job-specific files are to be placed. 969 */ 970 public Path getSystemDir() { 971 try { 972 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() { 973 @Override 974 public Path run() throws IOException, InterruptedException { 975 return cluster.getSystemDir(); 976 } 977 }); 978 } catch (IOException ioe) { 979 return null; 980 } catch (InterruptedException ie) { 981 return null; 982 } 983 } 984 985 /** 986 * Checks if the job directory is clean and has all the required components 987 * for (re) starting the job 988 */ 989 public static boolean isJobDirValid(Path jobDirPath, FileSystem fs) 990 throws IOException { 991 FileStatus[] contents = fs.listStatus(jobDirPath); 992 int matchCount = 0; 993 if (contents != null && contents.length >= 2) { 994 for (FileStatus status : contents) { 995 if ("job.xml".equals(status.getPath().getName())) { 996 ++matchCount; 997 } 998 if ("job.split".equals(status.getPath().getName())) { 999 ++matchCount; 1000 } 1001 } 1002 if (matchCount == 2) { 1003 return true; 1004 } 1005 } 1006 return false; 1007 } 1008 1009 /** 1010 * Fetch the staging area directory for the application 1011 * 1012 * @return path to staging area directory 1013 * @throws IOException 1014 */ 1015 public Path getStagingAreaDir() throws IOException { 1016 try { 1017 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() { 1018 @Override 1019 public Path run() throws IOException, InterruptedException { 1020 return cluster.getStagingAreaDir(); 1021 } 1022 }); 1023 } catch (InterruptedException ie) { 1024 // throw RuntimeException instead for compatibility reasons 1025 throw new RuntimeException(ie); 1026 } 1027 } 1028 1029 private JobQueueInfo getJobQueueInfo(QueueInfo queue) { 1030 JobQueueInfo ret = new JobQueueInfo(queue); 1031 // make sure to convert any children 1032 if (queue.getQueueChildren().size() > 0) { 1033 List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue 1034 .getQueueChildren().size()); 1035 for (QueueInfo child : queue.getQueueChildren()) { 1036 childQueues.add(getJobQueueInfo(child)); 1037 } 1038 ret.setChildren(childQueues); 1039 } 1040 return ret; 1041 } 1042 1043 private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues) 1044 throws IOException { 1045 JobQueueInfo[] ret = new JobQueueInfo[queues.length]; 1046 for (int i = 0; i < queues.length; i++) { 1047 ret[i] = getJobQueueInfo(queues[i]); 1048 } 1049 return ret; 1050 } 1051 1052 /** 1053 * Returns an array of queue information objects about root level queues 1054 * configured 1055 * 1056 * @return the array of root level JobQueueInfo objects 1057 * @throws IOException 1058 */ 1059 public JobQueueInfo[] getRootQueues() throws IOException { 1060 try { 1061 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1062 public JobQueueInfo[] run() throws IOException, InterruptedException { 1063 return getJobQueueInfoArray(cluster.getRootQueues()); 1064 } 1065 }); 1066 } catch (InterruptedException ie) { 1067 throw new IOException(ie); 1068 } 1069 } 1070 1071 /** 1072 * Returns an array of queue information objects about immediate children 1073 * of queue queueName. 1074 * 1075 * @param queueName 1076 * @return the array of immediate children JobQueueInfo objects 1077 * @throws IOException 1078 */ 1079 public JobQueueInfo[] getChildQueues(final String queueName) throws IOException { 1080 try { 1081 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1082 public JobQueueInfo[] run() throws IOException, InterruptedException { 1083 return getJobQueueInfoArray(cluster.getChildQueues(queueName)); 1084 } 1085 }); 1086 } catch (InterruptedException ie) { 1087 throw new IOException(ie); 1088 } 1089 } 1090 1091 /** 1092 * Return an array of queue information objects about all the Job Queues 1093 * configured. 1094 * 1095 * @return Array of JobQueueInfo objects 1096 * @throws IOException 1097 */ 1098 public JobQueueInfo[] getQueues() throws IOException { 1099 try { 1100 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1101 public JobQueueInfo[] run() throws IOException, InterruptedException { 1102 return getJobQueueInfoArray(cluster.getQueues()); 1103 } 1104 }); 1105 } catch (InterruptedException ie) { 1106 throw new IOException(ie); 1107 } 1108 } 1109 1110 /** 1111 * Gets all the jobs which were added to particular Job Queue 1112 * 1113 * @param queueName name of the Job Queue 1114 * @return Array of jobs present in the job queue 1115 * @throws IOException 1116 */ 1117 1118 public JobStatus[] getJobsFromQueue(final String queueName) throws IOException { 1119 try { 1120 QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() { 1121 @Override 1122 public QueueInfo run() throws IOException, InterruptedException { 1123 return cluster.getQueue(queueName); 1124 } 1125 }); 1126 if (queue == null) { 1127 return null; 1128 } 1129 org.apache.hadoop.mapreduce.JobStatus[] stats = 1130 queue.getJobStatuses(); 1131 JobStatus[] ret = new JobStatus[stats.length]; 1132 for (int i = 0 ; i < stats.length; i++ ) { 1133 ret[i] = JobStatus.downgrade(stats[i]); 1134 } 1135 return ret; 1136 } catch (InterruptedException ie) { 1137 throw new IOException(ie); 1138 } 1139 } 1140 1141 /** 1142 * Gets the queue information associated to a particular Job Queue 1143 * 1144 * @param queueName name of the job queue. 1145 * @return Queue information associated to particular queue. 1146 * @throws IOException 1147 */ 1148 public JobQueueInfo getQueueInfo(final String queueName) throws IOException { 1149 try { 1150 QueueInfo queueInfo = clientUgi.doAs(new 1151 PrivilegedExceptionAction<QueueInfo>() { 1152 public QueueInfo run() throws IOException, InterruptedException { 1153 return cluster.getQueue(queueName); 1154 } 1155 }); 1156 if (queueInfo != null) { 1157 return new JobQueueInfo(queueInfo); 1158 } 1159 return null; 1160 } catch (InterruptedException ie) { 1161 throw new IOException(ie); 1162 } 1163 } 1164 1165 /** 1166 * Gets the Queue ACLs for current user 1167 * @return array of QueueAclsInfo object for current user. 1168 * @throws IOException 1169 */ 1170 public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException { 1171 try { 1172 org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 1173 clientUgi.doAs(new 1174 PrivilegedExceptionAction 1175 <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() { 1176 public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 1177 throws IOException, InterruptedException { 1178 return cluster.getQueueAclsForCurrentUser(); 1179 } 1180 }); 1181 QueueAclsInfo[] ret = new QueueAclsInfo[acls.length]; 1182 for (int i = 0 ; i < acls.length; i++ ) { 1183 ret[i] = QueueAclsInfo.downgrade(acls[i]); 1184 } 1185 return ret; 1186 } catch (InterruptedException ie) { 1187 throw new IOException(ie); 1188 } 1189 } 1190 1191 /** 1192 * Get a delegation token for the user from the JobTracker. 1193 * @param renewer the user who can renew the token 1194 * @return the new token 1195 * @throws IOException 1196 */ 1197 public Token<DelegationTokenIdentifier> 1198 getDelegationToken(final Text renewer) throws IOException, InterruptedException { 1199 return clientUgi.doAs(new 1200 PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() { 1201 public Token<DelegationTokenIdentifier> run() throws IOException, 1202 InterruptedException { 1203 return cluster.getDelegationToken(renewer); 1204 } 1205 }); 1206 } 1207 1208 /** 1209 * Renew a delegation token 1210 * @param token the token to renew 1211 * @return true if the renewal went well 1212 * @throws InvalidToken 1213 * @throws IOException 1214 * @deprecated Use {@link Token#renew} instead 1215 */ 1216 public long renewDelegationToken(Token<DelegationTokenIdentifier> token 1217 ) throws InvalidToken, IOException, 1218 InterruptedException { 1219 return token.renew(getConf()); 1220 } 1221 1222 /** 1223 * Cancel a delegation token from the JobTracker 1224 * @param token the token to cancel 1225 * @throws IOException 1226 * @deprecated Use {@link Token#cancel} instead 1227 */ 1228 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token 1229 ) throws InvalidToken, IOException, 1230 InterruptedException { 1231 token.cancel(getConf()); 1232 } 1233 1234 /** 1235 */ 1236 public static void main(String argv[]) throws Exception { 1237 int res = ToolRunner.run(new JobClient(), argv); 1238 System.exit(res); 1239 } 1240} 1241