001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapreduce.tools; 019 020import java.io.IOException; 021import java.io.PrintWriter; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Set; 027 028import org.apache.commons.lang.StringUtils; 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.apache.hadoop.classification.InterfaceAudience; 032import org.apache.hadoop.classification.InterfaceStability; 033import org.apache.hadoop.classification.InterfaceAudience.Private; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.conf.Configured; 036import org.apache.hadoop.ipc.RemoteException; 037import org.apache.hadoop.mapred.JobConf; 038import org.apache.hadoop.mapred.TIPStatus; 039import org.apache.hadoop.mapreduce.Cluster; 040import org.apache.hadoop.mapreduce.Counters; 041import org.apache.hadoop.mapreduce.Job; 042import org.apache.hadoop.mapreduce.JobID; 043import org.apache.hadoop.mapreduce.JobPriority; 044import org.apache.hadoop.mapreduce.JobStatus; 045import org.apache.hadoop.mapreduce.TaskAttemptID; 046import org.apache.hadoop.mapreduce.TaskCompletionEvent; 047import org.apache.hadoop.mapreduce.TaskReport; 048import org.apache.hadoop.mapreduce.TaskTrackerInfo; 049import org.apache.hadoop.mapreduce.TaskType; 050import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer; 051import org.apache.hadoop.mapreduce.v2.LogParams; 052import org.apache.hadoop.security.AccessControlException; 053import org.apache.hadoop.util.ExitUtil; 054import org.apache.hadoop.util.Tool; 055import org.apache.hadoop.util.ToolRunner; 056import org.apache.hadoop.yarn.logaggregation.LogDumper; 057 058/** 059 * Interprets the map reduce cli options 060 */ 061@InterfaceAudience.Public 062@InterfaceStability.Stable 063public class CLI extends Configured implements Tool { 064 private static final Log LOG = LogFactory.getLog(CLI.class); 065 protected Cluster cluster; 066 private static final Set<String> taskTypes = new HashSet<String>( 067 Arrays.asList("MAP", "REDUCE")); 068 private final Set<String> taskStates = new HashSet<String>(Arrays.asList( 069 "running", "completed", "pending", "failed", "killed")); 070 071 public CLI() { 072 } 073 074 public CLI(Configuration conf) { 075 setConf(conf); 076 } 077 078 public int run(String[] argv) throws Exception { 079 int exitCode = -1; 080 if (argv.length < 1) { 081 displayUsage(""); 082 return exitCode; 083 } 084 // process arguments 085 String cmd = argv[0]; 086 String submitJobFile = null; 087 String jobid = null; 088 String taskid = null; 089 String historyFile = null; 090 String counterGroupName = null; 091 String counterName = null; 092 JobPriority jp = null; 093 String taskType = null; 094 String taskState = null; 095 int fromEvent = 0; 096 int nEvents = 0; 097 boolean getStatus = false; 098 boolean getCounter = false; 099 boolean killJob = false; 100 boolean listEvents = false; 101 boolean viewHistory = false; 102 boolean viewAllHistory = false; 103 boolean listJobs = false; 104 boolean listAllJobs = false; 105 boolean listActiveTrackers = false; 106 boolean listBlacklistedTrackers = false; 107 boolean displayTasks = false; 108 boolean killTask = false; 109 boolean failTask = false; 110 boolean setJobPriority = false; 111 boolean logs = false; 112 113 if ("-submit".equals(cmd)) { 114 if (argv.length != 2) { 115 displayUsage(cmd); 116 return exitCode; 117 } 118 submitJobFile = argv[1]; 119 } else if ("-status".equals(cmd)) { 120 if (argv.length != 2) { 121 displayUsage(cmd); 122 return exitCode; 123 } 124 jobid = argv[1]; 125 getStatus = true; 126 } else if("-counter".equals(cmd)) { 127 if (argv.length != 4) { 128 displayUsage(cmd); 129 return exitCode; 130 } 131 getCounter = true; 132 jobid = argv[1]; 133 counterGroupName = argv[2]; 134 counterName = argv[3]; 135 } else if ("-kill".equals(cmd)) { 136 if (argv.length != 2) { 137 displayUsage(cmd); 138 return exitCode; 139 } 140 jobid = argv[1]; 141 killJob = true; 142 } else if ("-set-priority".equals(cmd)) { 143 if (argv.length != 3) { 144 displayUsage(cmd); 145 return exitCode; 146 } 147 jobid = argv[1]; 148 try { 149 jp = JobPriority.valueOf(argv[2]); 150 } catch (IllegalArgumentException iae) { 151 LOG.info(iae); 152 displayUsage(cmd); 153 return exitCode; 154 } 155 setJobPriority = true; 156 } else if ("-events".equals(cmd)) { 157 if (argv.length != 4) { 158 displayUsage(cmd); 159 return exitCode; 160 } 161 jobid = argv[1]; 162 fromEvent = Integer.parseInt(argv[2]); 163 nEvents = Integer.parseInt(argv[3]); 164 listEvents = true; 165 } else if ("-history".equals(cmd)) { 166 if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) { 167 displayUsage(cmd); 168 return exitCode; 169 } 170 viewHistory = true; 171 if (argv.length == 3 && "all".equals(argv[1])) { 172 viewAllHistory = true; 173 historyFile = argv[2]; 174 } else { 175 historyFile = argv[1]; 176 } 177 } else if ("-list".equals(cmd)) { 178 if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) { 179 displayUsage(cmd); 180 return exitCode; 181 } 182 if (argv.length == 2 && "all".equals(argv[1])) { 183 listAllJobs = true; 184 } else { 185 listJobs = true; 186 } 187 } else if("-kill-task".equals(cmd)) { 188 if (argv.length != 2) { 189 displayUsage(cmd); 190 return exitCode; 191 } 192 killTask = true; 193 taskid = argv[1]; 194 } else if("-fail-task".equals(cmd)) { 195 if (argv.length != 2) { 196 displayUsage(cmd); 197 return exitCode; 198 } 199 failTask = true; 200 taskid = argv[1]; 201 } else if ("-list-active-trackers".equals(cmd)) { 202 if (argv.length != 1) { 203 displayUsage(cmd); 204 return exitCode; 205 } 206 listActiveTrackers = true; 207 } else if ("-list-blacklisted-trackers".equals(cmd)) { 208 if (argv.length != 1) { 209 displayUsage(cmd); 210 return exitCode; 211 } 212 listBlacklistedTrackers = true; 213 } else if ("-list-attempt-ids".equals(cmd)) { 214 if (argv.length != 4) { 215 displayUsage(cmd); 216 return exitCode; 217 } 218 jobid = argv[1]; 219 taskType = argv[2]; 220 taskState = argv[3]; 221 displayTasks = true; 222 if (!taskTypes.contains(taskType.toUpperCase())) { 223 System.out.println("Error: Invalid task-type: " + taskType); 224 displayUsage(cmd); 225 return exitCode; 226 } 227 if (!taskStates.contains(taskState.toLowerCase())) { 228 System.out.println("Error: Invalid task-state: " + taskState); 229 displayUsage(cmd); 230 return exitCode; 231 } 232 } else if ("-logs".equals(cmd)) { 233 if (argv.length == 2 || argv.length ==3) { 234 logs = true; 235 jobid = argv[1]; 236 if (argv.length == 3) { 237 taskid = argv[2]; 238 } else { 239 taskid = null; 240 } 241 } else { 242 displayUsage(cmd); 243 return exitCode; 244 } 245 } else { 246 displayUsage(cmd); 247 return exitCode; 248 } 249 250 // initialize cluster 251 cluster = createCluster(); 252 253 // Submit the request 254 try { 255 if (submitJobFile != null) { 256 Job job = Job.getInstance(new JobConf(submitJobFile)); 257 job.submit(); 258 System.out.println("Created job " + job.getJobID()); 259 exitCode = 0; 260 } else if (getStatus) { 261 Job job = cluster.getJob(JobID.forName(jobid)); 262 if (job == null) { 263 System.out.println("Could not find job " + jobid); 264 } else { 265 Counters counters = job.getCounters(); 266 System.out.println(); 267 System.out.println(job); 268 if (counters != null) { 269 System.out.println(counters); 270 } else { 271 System.out.println("Counters not available. Job is retired."); 272 } 273 exitCode = 0; 274 } 275 } else if (getCounter) { 276 Job job = cluster.getJob(JobID.forName(jobid)); 277 if (job == null) { 278 System.out.println("Could not find job " + jobid); 279 } else { 280 Counters counters = job.getCounters(); 281 if (counters == null) { 282 System.out.println("Counters not available for retired job " + 283 jobid); 284 exitCode = -1; 285 } else { 286 System.out.println(getCounter(counters, 287 counterGroupName, counterName)); 288 exitCode = 0; 289 } 290 } 291 } else if (killJob) { 292 Job job = cluster.getJob(JobID.forName(jobid)); 293 if (job == null) { 294 System.out.println("Could not find job " + jobid); 295 } else { 296 job.killJob(); 297 System.out.println("Killed job " + jobid); 298 exitCode = 0; 299 } 300 } else if (setJobPriority) { 301 Job job = cluster.getJob(JobID.forName(jobid)); 302 if (job == null) { 303 System.out.println("Could not find job " + jobid); 304 } else { 305 job.setPriority(jp); 306 System.out.println("Changed job priority."); 307 exitCode = 0; 308 } 309 } else if (viewHistory) { 310 viewHistory(historyFile, viewAllHistory); 311 exitCode = 0; 312 } else if (listEvents) { 313 listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents); 314 exitCode = 0; 315 } else if (listJobs) { 316 listJobs(cluster); 317 exitCode = 0; 318 } else if (listAllJobs) { 319 listAllJobs(cluster); 320 exitCode = 0; 321 } else if (listActiveTrackers) { 322 listActiveTrackers(cluster); 323 exitCode = 0; 324 } else if (listBlacklistedTrackers) { 325 listBlacklistedTrackers(cluster); 326 exitCode = 0; 327 } else if (displayTasks) { 328 displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState); 329 exitCode = 0; 330 } else if(killTask) { 331 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 332 Job job = cluster.getJob(taskID.getJobID()); 333 if (job == null) { 334 System.out.println("Could not find job " + jobid); 335 } else if (job.killTask(taskID)) { 336 System.out.println("Killed task " + taskid); 337 exitCode = 0; 338 } else { 339 System.out.println("Could not kill task " + taskid); 340 exitCode = -1; 341 } 342 } else if(failTask) { 343 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 344 Job job = cluster.getJob(taskID.getJobID()); 345 if (job == null) { 346 System.out.println("Could not find job " + jobid); 347 } else if(job.failTask(taskID)) { 348 System.out.println("Killed task " + taskID + " by failing it"); 349 exitCode = 0; 350 } else { 351 System.out.println("Could not fail task " + taskid); 352 exitCode = -1; 353 } 354 } else if (logs) { 355 try { 356 JobID jobID = JobID.forName(jobid); 357 TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid); 358 LogParams logParams = cluster.getLogParams(jobID, taskAttemptID); 359 LogDumper logDumper = new LogDumper(); 360 logDumper.setConf(getConf()); 361 exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(), 362 logParams.getContainerId(), logParams.getNodeId(), 363 logParams.getOwner()); 364 } catch (IOException e) { 365 if (e instanceof RemoteException) { 366 throw e; 367 } 368 System.out.println(e.getMessage()); 369 } 370 } 371 } catch (RemoteException re) { 372 IOException unwrappedException = re.unwrapRemoteException(); 373 if (unwrappedException instanceof AccessControlException) { 374 System.out.println(unwrappedException.getMessage()); 375 } else { 376 throw re; 377 } 378 } finally { 379 cluster.close(); 380 } 381 return exitCode; 382 } 383 384 Cluster createCluster() throws IOException { 385 return new Cluster(getConf()); 386 } 387 388 private String getJobPriorityNames() { 389 StringBuffer sb = new StringBuffer(); 390 for (JobPriority p : JobPriority.values()) { 391 sb.append(p.name()).append(" "); 392 } 393 return sb.substring(0, sb.length()-1); 394 } 395 396 private String getTaskTypes() { 397 return StringUtils.join(taskTypes, " "); 398 } 399 400 /** 401 * Display usage of the command-line tool and terminate execution. 402 */ 403 private void displayUsage(String cmd) { 404 String prefix = "Usage: CLI "; 405 String jobPriorityValues = getJobPriorityNames(); 406 String taskStates = "running, completed"; 407 408 if ("-submit".equals(cmd)) { 409 System.err.println(prefix + "[" + cmd + " <job-file>]"); 410 } else if ("-status".equals(cmd) || "-kill".equals(cmd)) { 411 System.err.println(prefix + "[" + cmd + " <job-id>]"); 412 } else if ("-counter".equals(cmd)) { 413 System.err.println(prefix + "[" + cmd + 414 " <job-id> <group-name> <counter-name>]"); 415 } else if ("-events".equals(cmd)) { 416 System.err.println(prefix + "[" + cmd + 417 " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1."); 418 } else if ("-history".equals(cmd)) { 419 System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]"); 420 } else if ("-list".equals(cmd)) { 421 System.err.println(prefix + "[" + cmd + " [all]]"); 422 } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) { 423 System.err.println(prefix + "[" + cmd + " <task-attempt-id>]"); 424 } else if ("-set-priority".equals(cmd)) { 425 System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " + 426 "Valid values for priorities are: " 427 + jobPriorityValues); 428 } else if ("-list-active-trackers".equals(cmd)) { 429 System.err.println(prefix + "[" + cmd + "]"); 430 } else if ("-list-blacklisted-trackers".equals(cmd)) { 431 System.err.println(prefix + "[" + cmd + "]"); 432 } else if ("-list-attempt-ids".equals(cmd)) { 433 System.err.println(prefix + "[" + cmd + 434 " <job-id> <task-type> <task-state>]. " + 435 "Valid values for <task-type> are " + getTaskTypes() + ". " + 436 "Valid values for <task-state> are " + taskStates); 437 } else if ("-logs".equals(cmd)) { 438 System.err.println(prefix + "[" + cmd + 439 " <job-id> <task-attempt-id>]. " + 440 " <task-attempt-id> is optional to get task attempt logs."); 441 } else { 442 System.err.printf(prefix + "<command> <args>\n"); 443 System.err.printf("\t[-submit <job-file>]\n"); 444 System.err.printf("\t[-status <job-id>]\n"); 445 System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]\n"); 446 System.err.printf("\t[-kill <job-id>]\n"); 447 System.err.printf("\t[-set-priority <job-id> <priority>]. " + 448 "Valid values for priorities are: " + jobPriorityValues + "\n"); 449 System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]\n"); 450 System.err.printf("\t[-history <jobHistoryFile>]\n"); 451 System.err.printf("\t[-list [all]]\n"); 452 System.err.printf("\t[-list-active-trackers]\n"); 453 System.err.printf("\t[-list-blacklisted-trackers]\n"); 454 System.err.println("\t[-list-attempt-ids <job-id> <task-type> " + 455 "<task-state>]. " + 456 "Valid values for <task-type> are " + getTaskTypes() + ". " + 457 "Valid values for <task-state> are " + taskStates); 458 System.err.printf("\t[-kill-task <task-attempt-id>]\n"); 459 System.err.printf("\t[-fail-task <task-attempt-id>]\n"); 460 System.err.printf("\t[-logs <job-id> <task-attempt-id>]\n\n"); 461 ToolRunner.printGenericCommandUsage(System.out); 462 } 463 } 464 465 private void viewHistory(String historyFile, boolean all) 466 throws IOException { 467 HistoryViewer historyViewer = new HistoryViewer(historyFile, 468 getConf(), all); 469 historyViewer.print(); 470 } 471 472 protected long getCounter(Counters counters, String counterGroupName, 473 String counterName) throws IOException { 474 return counters.findCounter(counterGroupName, counterName).getValue(); 475 } 476 477 /** 478 * List the events for the given job 479 * @param jobId the job id for the job's events to list 480 * @throws IOException 481 */ 482 private void listEvents(Job job, int fromEventId, int numEvents) 483 throws IOException, InterruptedException { 484 TaskCompletionEvent[] events = job. 485 getTaskCompletionEvents(fromEventId, numEvents); 486 System.out.println("Task completion events for " + job.getJobID()); 487 System.out.println("Number of events (from " + fromEventId + ") are: " 488 + events.length); 489 for(TaskCompletionEvent event: events) { 490 System.out.println(event.getStatus() + " " + 491 event.getTaskAttemptId() + " " + 492 getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp())); 493 } 494 } 495 496 protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { 497 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 498 } 499 500 501 /** 502 * Dump a list of currently running jobs 503 * @throws IOException 504 */ 505 private void listJobs(Cluster cluster) 506 throws IOException, InterruptedException { 507 List<JobStatus> runningJobs = new ArrayList<JobStatus>(); 508 for (JobStatus job : cluster.getAllJobStatuses()) { 509 if (!job.isJobComplete()) { 510 runningJobs.add(job); 511 } 512 } 513 displayJobList(runningJobs.toArray(new JobStatus[0])); 514 } 515 516 /** 517 * Dump a list of all jobs submitted. 518 * @throws IOException 519 */ 520 private void listAllJobs(Cluster cluster) 521 throws IOException, InterruptedException { 522 displayJobList(cluster.getAllJobStatuses()); 523 } 524 525 /** 526 * Display the list of active trackers 527 */ 528 private void listActiveTrackers(Cluster cluster) 529 throws IOException, InterruptedException { 530 TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers(); 531 for (TaskTrackerInfo tracker : trackers) { 532 System.out.println(tracker.getTaskTrackerName()); 533 } 534 } 535 536 /** 537 * Display the list of blacklisted trackers 538 */ 539 private void listBlacklistedTrackers(Cluster cluster) 540 throws IOException, InterruptedException { 541 TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers(); 542 if (trackers.length > 0) { 543 System.out.println("BlackListedNode \t Reason"); 544 } 545 for (TaskTrackerInfo tracker : trackers) { 546 System.out.println(tracker.getTaskTrackerName() + "\t" + 547 tracker.getReasonForBlacklist()); 548 } 549 } 550 551 private void printTaskAttempts(TaskReport report) { 552 if (report.getCurrentStatus() == TIPStatus.COMPLETE) { 553 System.out.println(report.getSuccessfulTaskAttemptId()); 554 } else if (report.getCurrentStatus() == TIPStatus.RUNNING) { 555 for (TaskAttemptID t : 556 report.getRunningTaskAttemptIds()) { 557 System.out.println(t); 558 } 559 } 560 } 561 562 /** 563 * Display the information about a job's tasks, of a particular type and 564 * in a particular state 565 * 566 * @param job the job 567 * @param type the type of the task (map/reduce/setup/cleanup) 568 * @param state the state of the task 569 * (pending/running/completed/failed/killed) 570 */ 571 protected void displayTasks(Job job, String type, String state) 572 throws IOException, InterruptedException { 573 TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type.toUpperCase())); 574 for (TaskReport report : reports) { 575 TIPStatus status = report.getCurrentStatus(); 576 if ((state.equalsIgnoreCase("pending") && status ==TIPStatus.PENDING) || 577 (state.equalsIgnoreCase("running") && status ==TIPStatus.RUNNING) || 578 (state.equalsIgnoreCase("completed") && status == TIPStatus.COMPLETE) || 579 (state.equalsIgnoreCase("failed") && status == TIPStatus.FAILED) || 580 (state.equalsIgnoreCase("killed") && status == TIPStatus.KILLED)) { 581 printTaskAttempts(report); 582 } 583 } 584 } 585 586 public void displayJobList(JobStatus[] jobs) 587 throws IOException, InterruptedException { 588 displayJobList(jobs, new PrintWriter(System.out)); 589 } 590 591 @Private 592 public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 593 @Private 594 public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 595 private static String memPattern = "%dM"; 596 private static String UNAVAILABLE = "N/A"; 597 598 @Private 599 public void displayJobList(JobStatus[] jobs, PrintWriter writer) { 600 writer.println("Total jobs:" + jobs.length); 601 writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName", 602 "Queue", "Priority", "UsedContainers", 603 "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info"); 604 for (JobStatus job : jobs) { 605 int numUsedSlots = job.getNumUsedSlots(); 606 int numReservedSlots = job.getNumReservedSlots(); 607 int usedMem = job.getUsedMem(); 608 int rsvdMem = job.getReservedMem(); 609 int neededMem = job.getNeededMem(); 610 writer.printf(dataPattern, 611 job.getJobID().toString(), job.getState(), job.getStartTime(), 612 job.getUsername(), job.getQueue(), 613 job.getPriority().name(), 614 numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots, 615 numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots, 616 usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem), 617 rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem), 618 neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem), 619 job.getSchedulingInfo()); 620 } 621 writer.flush(); 622 } 623 624 public static void main(String[] argv) throws Exception { 625 int res = ToolRunner.run(new CLI(), argv); 626 ExitUtil.terminate(res); 627 } 628}