001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapreduce.tools; 019 020import java.io.IOException; 021import java.io.OutputStreamWriter; 022import java.io.PrintWriter; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Set; 028 029import org.apache.commons.lang.StringUtils; 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.apache.hadoop.classification.InterfaceAudience; 033import org.apache.hadoop.classification.InterfaceStability; 034import org.apache.hadoop.classification.InterfaceAudience.Private; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.conf.Configured; 037import org.apache.hadoop.ipc.RemoteException; 038import org.apache.hadoop.mapred.JobConf; 039import org.apache.hadoop.mapred.TIPStatus; 040import org.apache.hadoop.mapreduce.Cluster; 041import org.apache.hadoop.mapreduce.Counters; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.mapreduce.JobID; 044import org.apache.hadoop.mapreduce.JobPriority; 045import org.apache.hadoop.mapreduce.JobStatus; 046import org.apache.hadoop.mapreduce.TaskAttemptID; 047import org.apache.hadoop.mapreduce.TaskCompletionEvent; 048import org.apache.hadoop.mapreduce.TaskReport; 049import org.apache.hadoop.mapreduce.TaskTrackerInfo; 050import org.apache.hadoop.mapreduce.TaskType; 051import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer; 052import org.apache.hadoop.mapreduce.v2.LogParams; 053import org.apache.hadoop.security.AccessControlException; 054import org.apache.hadoop.util.ExitUtil; 055import org.apache.hadoop.util.Tool; 056import org.apache.hadoop.util.ToolRunner; 057import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; 058 059import com.google.common.base.Charsets; 060 061/** 062 * Interprets the map reduce cli options 063 */ 064@InterfaceAudience.Public 065@InterfaceStability.Stable 066public class CLI extends Configured implements Tool { 067 private static final Log LOG = LogFactory.getLog(CLI.class); 068 protected Cluster cluster; 069 private static final Set<String> taskTypes = new HashSet<String>( 070 Arrays.asList("MAP", "REDUCE")); 071 private final Set<String> taskStates = new HashSet<String>(Arrays.asList( 072 "running", "completed", "pending", "failed", "killed")); 073 074 public CLI() { 075 } 076 077 public CLI(Configuration conf) { 078 setConf(conf); 079 } 080 081 public int run(String[] argv) throws Exception { 082 int exitCode = -1; 083 if (argv.length < 1) { 084 displayUsage(""); 085 return exitCode; 086 } 087 // process arguments 088 String cmd = argv[0]; 089 String submitJobFile = null; 090 String jobid = null; 091 String taskid = null; 092 String historyFile = null; 093 String counterGroupName = null; 094 String counterName = null; 095 JobPriority jp = null; 096 String taskType = null; 097 String taskState = null; 098 int fromEvent = 0; 099 int nEvents = 0; 100 boolean getStatus = false; 101 boolean getCounter = false; 102 boolean killJob = false; 103 boolean listEvents = false; 104 boolean viewHistory = false; 105 boolean viewAllHistory = false; 106 boolean listJobs = false; 107 boolean listAllJobs = false; 108 boolean listActiveTrackers = false; 109 boolean listBlacklistedTrackers = false; 110 boolean displayTasks = false; 111 boolean killTask = false; 112 boolean failTask = false; 113 boolean setJobPriority = false; 114 boolean logs = false; 115 116 if ("-submit".equals(cmd)) { 117 if (argv.length != 2) { 118 displayUsage(cmd); 119 return exitCode; 120 } 121 submitJobFile = argv[1]; 122 } else if ("-status".equals(cmd)) { 123 if (argv.length != 2) { 124 displayUsage(cmd); 125 return exitCode; 126 } 127 jobid = argv[1]; 128 getStatus = true; 129 } else if("-counter".equals(cmd)) { 130 if (argv.length != 4) { 131 displayUsage(cmd); 132 return exitCode; 133 } 134 getCounter = true; 135 jobid = argv[1]; 136 counterGroupName = argv[2]; 137 counterName = argv[3]; 138 } else if ("-kill".equals(cmd)) { 139 if (argv.length != 2) { 140 displayUsage(cmd); 141 return exitCode; 142 } 143 jobid = argv[1]; 144 killJob = true; 145 } else if ("-set-priority".equals(cmd)) { 146 if (argv.length != 3) { 147 displayUsage(cmd); 148 return exitCode; 149 } 150 jobid = argv[1]; 151 try { 152 jp = JobPriority.valueOf(argv[2]); 153 } catch (IllegalArgumentException iae) { 154 LOG.info(iae); 155 displayUsage(cmd); 156 return exitCode; 157 } 158 setJobPriority = true; 159 } else if ("-events".equals(cmd)) { 160 if (argv.length != 4) { 161 displayUsage(cmd); 162 return exitCode; 163 } 164 jobid = argv[1]; 165 fromEvent = Integer.parseInt(argv[2]); 166 nEvents = Integer.parseInt(argv[3]); 167 listEvents = true; 168 } else if ("-history".equals(cmd)) { 169 if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) { 170 displayUsage(cmd); 171 return exitCode; 172 } 173 viewHistory = true; 174 if (argv.length == 3 && "all".equals(argv[1])) { 175 viewAllHistory = true; 176 historyFile = argv[2]; 177 } else { 178 historyFile = argv[1]; 179 } 180 } else if ("-list".equals(cmd)) { 181 if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) { 182 displayUsage(cmd); 183 return exitCode; 184 } 185 if (argv.length == 2 && "all".equals(argv[1])) { 186 listAllJobs = true; 187 } else { 188 listJobs = true; 189 } 190 } else if("-kill-task".equals(cmd)) { 191 if (argv.length != 2) { 192 displayUsage(cmd); 193 return exitCode; 194 } 195 killTask = true; 196 taskid = argv[1]; 197 } else if("-fail-task".equals(cmd)) { 198 if (argv.length != 2) { 199 displayUsage(cmd); 200 return exitCode; 201 } 202 failTask = true; 203 taskid = argv[1]; 204 } else if ("-list-active-trackers".equals(cmd)) { 205 if (argv.length != 1) { 206 displayUsage(cmd); 207 return exitCode; 208 } 209 listActiveTrackers = true; 210 } else if ("-list-blacklisted-trackers".equals(cmd)) { 211 if (argv.length != 1) { 212 displayUsage(cmd); 213 return exitCode; 214 } 215 listBlacklistedTrackers = true; 216 } else if ("-list-attempt-ids".equals(cmd)) { 217 if (argv.length != 4) { 218 displayUsage(cmd); 219 return exitCode; 220 } 221 jobid = argv[1]; 222 taskType = argv[2]; 223 taskState = argv[3]; 224 displayTasks = true; 225 if (!taskTypes.contains(taskType.toUpperCase())) { 226 System.out.println("Error: Invalid task-type: " + taskType); 227 displayUsage(cmd); 228 return exitCode; 229 } 230 if (!taskStates.contains(taskState.toLowerCase())) { 231 System.out.println("Error: Invalid task-state: " + taskState); 232 displayUsage(cmd); 233 return exitCode; 234 } 235 } else if ("-logs".equals(cmd)) { 236 if (argv.length == 2 || argv.length ==3) { 237 logs = true; 238 jobid = argv[1]; 239 if (argv.length == 3) { 240 taskid = argv[2]; 241 } else { 242 taskid = null; 243 } 244 } else { 245 displayUsage(cmd); 246 return exitCode; 247 } 248 } else { 249 displayUsage(cmd); 250 return exitCode; 251 } 252 253 // initialize cluster 254 cluster = createCluster(); 255 256 // Submit the request 257 try { 258 if (submitJobFile != null) { 259 Job job = Job.getInstance(new JobConf(submitJobFile)); 260 job.submit(); 261 System.out.println("Created job " + job.getJobID()); 262 exitCode = 0; 263 } else if (getStatus) { 264 Job job = cluster.getJob(JobID.forName(jobid)); 265 if (job == null) { 266 System.out.println("Could not find job " + jobid); 267 } else { 268 Counters counters = job.getCounters(); 269 System.out.println(); 270 System.out.println(job); 271 if (counters != null) { 272 System.out.println(counters); 273 } else { 274 System.out.println("Counters not available. Job is retired."); 275 } 276 exitCode = 0; 277 } 278 } else if (getCounter) { 279 Job job = cluster.getJob(JobID.forName(jobid)); 280 if (job == null) { 281 System.out.println("Could not find job " + jobid); 282 } else { 283 Counters counters = job.getCounters(); 284 if (counters == null) { 285 System.out.println("Counters not available for retired job " + 286 jobid); 287 exitCode = -1; 288 } else { 289 System.out.println(getCounter(counters, 290 counterGroupName, counterName)); 291 exitCode = 0; 292 } 293 } 294 } else if (killJob) { 295 Job job = cluster.getJob(JobID.forName(jobid)); 296 if (job == null) { 297 System.out.println("Could not find job " + jobid); 298 } else { 299 job.killJob(); 300 System.out.println("Killed job " + jobid); 301 exitCode = 0; 302 } 303 } else if (setJobPriority) { 304 Job job = cluster.getJob(JobID.forName(jobid)); 305 if (job == null) { 306 System.out.println("Could not find job " + jobid); 307 } else { 308 job.setPriority(jp); 309 System.out.println("Changed job priority."); 310 exitCode = 0; 311 } 312 } else if (viewHistory) { 313 viewHistory(historyFile, viewAllHistory); 314 exitCode = 0; 315 } else if (listEvents) { 316 listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents); 317 exitCode = 0; 318 } else if (listJobs) { 319 listJobs(cluster); 320 exitCode = 0; 321 } else if (listAllJobs) { 322 listAllJobs(cluster); 323 exitCode = 0; 324 } else if (listActiveTrackers) { 325 listActiveTrackers(cluster); 326 exitCode = 0; 327 } else if (listBlacklistedTrackers) { 328 listBlacklistedTrackers(cluster); 329 exitCode = 0; 330 } else if (displayTasks) { 331 displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState); 332 exitCode = 0; 333 } else if(killTask) { 334 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 335 Job job = cluster.getJob(taskID.getJobID()); 336 if (job == null) { 337 System.out.println("Could not find job " + jobid); 338 } else if (job.killTask(taskID, false)) { 339 System.out.println("Killed task " + taskid); 340 exitCode = 0; 341 } else { 342 System.out.println("Could not kill task " + taskid); 343 exitCode = -1; 344 } 345 } else if(failTask) { 346 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 347 Job job = cluster.getJob(taskID.getJobID()); 348 if (job == null) { 349 System.out.println("Could not find job " + jobid); 350 } else if(job.killTask(taskID, true)) { 351 System.out.println("Killed task " + taskID + " by failing it"); 352 exitCode = 0; 353 } else { 354 System.out.println("Could not fail task " + taskid); 355 exitCode = -1; 356 } 357 } else if (logs) { 358 try { 359 JobID jobID = JobID.forName(jobid); 360 TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid); 361 LogParams logParams = cluster.getLogParams(jobID, taskAttemptID); 362 LogCLIHelpers logDumper = new LogCLIHelpers(); 363 logDumper.setConf(getConf()); 364 exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(), 365 logParams.getContainerId(), logParams.getNodeId(), 366 logParams.getOwner()); 367 } catch (IOException e) { 368 if (e instanceof RemoteException) { 369 throw e; 370 } 371 System.out.println(e.getMessage()); 372 } 373 } 374 } catch (RemoteException re) { 375 IOException unwrappedException = re.unwrapRemoteException(); 376 if (unwrappedException instanceof AccessControlException) { 377 System.out.println(unwrappedException.getMessage()); 378 } else { 379 throw re; 380 } 381 } finally { 382 cluster.close(); 383 } 384 return exitCode; 385 } 386 387 Cluster createCluster() throws IOException { 388 return new Cluster(getConf()); 389 } 390 391 private String getJobPriorityNames() { 392 StringBuffer sb = new StringBuffer(); 393 for (JobPriority p : JobPriority.values()) { 394 sb.append(p.name()).append(" "); 395 } 396 return sb.substring(0, sb.length()-1); 397 } 398 399 private String getTaskTypes() { 400 return StringUtils.join(taskTypes, " "); 401 } 402 403 /** 404 * Display usage of the command-line tool and terminate execution. 405 */ 406 private void displayUsage(String cmd) { 407 String prefix = "Usage: CLI "; 408 String jobPriorityValues = getJobPriorityNames(); 409 String taskStates = "running, completed"; 410 411 if ("-submit".equals(cmd)) { 412 System.err.println(prefix + "[" + cmd + " <job-file>]"); 413 } else if ("-status".equals(cmd) || "-kill".equals(cmd)) { 414 System.err.println(prefix + "[" + cmd + " <job-id>]"); 415 } else if ("-counter".equals(cmd)) { 416 System.err.println(prefix + "[" + cmd + 417 " <job-id> <group-name> <counter-name>]"); 418 } else if ("-events".equals(cmd)) { 419 System.err.println(prefix + "[" + cmd + 420 " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1."); 421 } else if ("-history".equals(cmd)) { 422 System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]"); 423 } else if ("-list".equals(cmd)) { 424 System.err.println(prefix + "[" + cmd + " [all]]"); 425 } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) { 426 System.err.println(prefix + "[" + cmd + " <task-attempt-id>]"); 427 } else if ("-set-priority".equals(cmd)) { 428 System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " + 429 "Valid values for priorities are: " 430 + jobPriorityValues); 431 } else if ("-list-active-trackers".equals(cmd)) { 432 System.err.println(prefix + "[" + cmd + "]"); 433 } else if ("-list-blacklisted-trackers".equals(cmd)) { 434 System.err.println(prefix + "[" + cmd + "]"); 435 } else if ("-list-attempt-ids".equals(cmd)) { 436 System.err.println(prefix + "[" + cmd + 437 " <job-id> <task-type> <task-state>]. " + 438 "Valid values for <task-type> are " + getTaskTypes() + ". " + 439 "Valid values for <task-state> are " + taskStates); 440 } else if ("-logs".equals(cmd)) { 441 System.err.println(prefix + "[" + cmd + 442 " <job-id> <task-attempt-id>]. " + 443 " <task-attempt-id> is optional to get task attempt logs."); 444 } else { 445 System.err.printf(prefix + "<command> <args>%n"); 446 System.err.printf("\t[-submit <job-file>]%n"); 447 System.err.printf("\t[-status <job-id>]%n"); 448 System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n"); 449 System.err.printf("\t[-kill <job-id>]%n"); 450 System.err.printf("\t[-set-priority <job-id> <priority>]. " + 451 "Valid values for priorities are: " + jobPriorityValues + "%n"); 452 System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n"); 453 System.err.printf("\t[-history <jobHistoryFile>]%n"); 454 System.err.printf("\t[-list [all]]%n"); 455 System.err.printf("\t[-list-active-trackers]%n"); 456 System.err.printf("\t[-list-blacklisted-trackers]%n"); 457 System.err.println("\t[-list-attempt-ids <job-id> <task-type> " + 458 "<task-state>]. " + 459 "Valid values for <task-type> are " + getTaskTypes() + ". " + 460 "Valid values for <task-state> are " + taskStates); 461 System.err.printf("\t[-kill-task <task-attempt-id>]%n"); 462 System.err.printf("\t[-fail-task <task-attempt-id>]%n"); 463 System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n%n"); 464 ToolRunner.printGenericCommandUsage(System.out); 465 } 466 } 467 468 private void viewHistory(String historyFile, boolean all) 469 throws IOException { 470 HistoryViewer historyViewer = new HistoryViewer(historyFile, 471 getConf(), all); 472 historyViewer.print(); 473 } 474 475 protected long getCounter(Counters counters, String counterGroupName, 476 String counterName) throws IOException { 477 return counters.findCounter(counterGroupName, counterName).getValue(); 478 } 479 480 /** 481 * List the events for the given job 482 * @param jobId the job id for the job's events to list 483 * @throws IOException 484 */ 485 private void listEvents(Job job, int fromEventId, int numEvents) 486 throws IOException, InterruptedException { 487 TaskCompletionEvent[] events = job. 488 getTaskCompletionEvents(fromEventId, numEvents); 489 System.out.println("Task completion events for " + job.getJobID()); 490 System.out.println("Number of events (from " + fromEventId + ") are: " 491 + events.length); 492 for(TaskCompletionEvent event: events) { 493 System.out.println(event.getStatus() + " " + 494 event.getTaskAttemptId() + " " + 495 getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp())); 496 } 497 } 498 499 protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { 500 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 501 } 502 503 504 /** 505 * Dump a list of currently running jobs 506 * @throws IOException 507 */ 508 private void listJobs(Cluster cluster) 509 throws IOException, InterruptedException { 510 List<JobStatus> runningJobs = new ArrayList<JobStatus>(); 511 for (JobStatus job : cluster.getAllJobStatuses()) { 512 if (!job.isJobComplete()) { 513 runningJobs.add(job); 514 } 515 } 516 displayJobList(runningJobs.toArray(new JobStatus[0])); 517 } 518 519 /** 520 * Dump a list of all jobs submitted. 521 * @throws IOException 522 */ 523 private void listAllJobs(Cluster cluster) 524 throws IOException, InterruptedException { 525 displayJobList(cluster.getAllJobStatuses()); 526 } 527 528 /** 529 * Display the list of active trackers 530 */ 531 private void listActiveTrackers(Cluster cluster) 532 throws IOException, InterruptedException { 533 TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers(); 534 for (TaskTrackerInfo tracker : trackers) { 535 System.out.println(tracker.getTaskTrackerName()); 536 } 537 } 538 539 /** 540 * Display the list of blacklisted trackers 541 */ 542 private void listBlacklistedTrackers(Cluster cluster) 543 throws IOException, InterruptedException { 544 TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers(); 545 if (trackers.length > 0) { 546 System.out.println("BlackListedNode \t Reason"); 547 } 548 for (TaskTrackerInfo tracker : trackers) { 549 System.out.println(tracker.getTaskTrackerName() + "\t" + 550 tracker.getReasonForBlacklist()); 551 } 552 } 553 554 private void printTaskAttempts(TaskReport report) { 555 if (report.getCurrentStatus() == TIPStatus.COMPLETE) { 556 System.out.println(report.getSuccessfulTaskAttemptId()); 557 } else if (report.getCurrentStatus() == TIPStatus.RUNNING) { 558 for (TaskAttemptID t : 559 report.getRunningTaskAttemptIds()) { 560 System.out.println(t); 561 } 562 } 563 } 564 565 /** 566 * Display the information about a job's tasks, of a particular type and 567 * in a particular state 568 * 569 * @param job the job 570 * @param type the type of the task (map/reduce/setup/cleanup) 571 * @param state the state of the task 572 * (pending/running/completed/failed/killed) 573 */ 574 protected void displayTasks(Job job, String type, String state) 575 throws IOException, InterruptedException { 576 TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type.toUpperCase())); 577 for (TaskReport report : reports) { 578 TIPStatus status = report.getCurrentStatus(); 579 if ((state.equalsIgnoreCase("pending") && status ==TIPStatus.PENDING) || 580 (state.equalsIgnoreCase("running") && status ==TIPStatus.RUNNING) || 581 (state.equalsIgnoreCase("completed") && status == TIPStatus.COMPLETE) || 582 (state.equalsIgnoreCase("failed") && status == TIPStatus.FAILED) || 583 (state.equalsIgnoreCase("killed") && status == TIPStatus.KILLED)) { 584 printTaskAttempts(report); 585 } 586 } 587 } 588 589 public void displayJobList(JobStatus[] jobs) 590 throws IOException, InterruptedException { 591 displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out, 592 Charsets.UTF_8))); 593 } 594 595 @Private 596 public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 597 @Private 598 public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 599 private static String memPattern = "%dM"; 600 private static String UNAVAILABLE = "N/A"; 601 602 @Private 603 public void displayJobList(JobStatus[] jobs, PrintWriter writer) { 604 writer.println("Total jobs:" + jobs.length); 605 writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName", 606 "Queue", "Priority", "UsedContainers", 607 "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info"); 608 for (JobStatus job : jobs) { 609 int numUsedSlots = job.getNumUsedSlots(); 610 int numReservedSlots = job.getNumReservedSlots(); 611 int usedMem = job.getUsedMem(); 612 int rsvdMem = job.getReservedMem(); 613 int neededMem = job.getNeededMem(); 614 writer.printf(dataPattern, 615 job.getJobID().toString(), job.getState(), job.getStartTime(), 616 job.getUsername(), job.getQueue(), 617 job.getPriority().name(), 618 numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots, 619 numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots, 620 usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem), 621 rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem), 622 neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem), 623 job.getSchedulingInfo()); 624 } 625 writer.flush(); 626 } 627 628 public static void main(String[] argv) throws Exception { 629 int res = ToolRunner.run(new CLI(), argv); 630 ExitUtil.terminate(res); 631 } 632}