001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapreduce.tools;
019
020 import java.io.IOException;
021 import java.io.OutputStreamWriter;
022 import java.io.PrintWriter;
023 import java.util.ArrayList;
024 import java.util.Arrays;
025 import java.util.HashSet;
026 import java.util.List;
027 import java.util.Set;
028
029 import org.apache.commons.lang.StringUtils;
030 import org.apache.commons.logging.Log;
031 import org.apache.commons.logging.LogFactory;
032 import org.apache.hadoop.classification.InterfaceAudience;
033 import org.apache.hadoop.classification.InterfaceStability;
034 import org.apache.hadoop.classification.InterfaceAudience.Private;
035 import org.apache.hadoop.conf.Configuration;
036 import org.apache.hadoop.conf.Configured;
037 import org.apache.hadoop.ipc.RemoteException;
038 import org.apache.hadoop.mapred.JobConf;
039 import org.apache.hadoop.mapred.TIPStatus;
040 import org.apache.hadoop.mapreduce.Cluster;
041 import org.apache.hadoop.mapreduce.Counters;
042 import org.apache.hadoop.mapreduce.Job;
043 import org.apache.hadoop.mapreduce.JobID;
044 import org.apache.hadoop.mapreduce.JobPriority;
045 import org.apache.hadoop.mapreduce.JobStatus;
046 import org.apache.hadoop.mapreduce.TaskAttemptID;
047 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
048 import org.apache.hadoop.mapreduce.TaskReport;
049 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
050 import org.apache.hadoop.mapreduce.TaskType;
051 import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
052 import org.apache.hadoop.mapreduce.v2.LogParams;
053 import org.apache.hadoop.security.AccessControlException;
054 import org.apache.hadoop.util.ExitUtil;
055 import org.apache.hadoop.util.Tool;
056 import org.apache.hadoop.util.ToolRunner;
057 import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
058
059 import com.google.common.base.Charsets;
060
061 /**
062 * Interprets the map reduce cli options
063 */
064 @InterfaceAudience.Public
065 @InterfaceStability.Stable
066 public class CLI extends Configured implements Tool {
067 private static final Log LOG = LogFactory.getLog(CLI.class);
068 protected Cluster cluster;
069 private static final Set<String> taskTypes = new HashSet<String>(
070 Arrays.asList("MAP", "REDUCE"));
071 private final Set<String> taskStates = new HashSet<String>(Arrays.asList(
072 "running", "completed", "pending", "failed", "killed"));
073
074 public CLI() {
075 }
076
077 public CLI(Configuration conf) {
078 setConf(conf);
079 }
080
081 public int run(String[] argv) throws Exception {
082 int exitCode = -1;
083 if (argv.length < 1) {
084 displayUsage("");
085 return exitCode;
086 }
087 // process arguments
088 String cmd = argv[0];
089 String submitJobFile = null;
090 String jobid = null;
091 String taskid = null;
092 String historyFile = null;
093 String counterGroupName = null;
094 String counterName = null;
095 JobPriority jp = null;
096 String taskType = null;
097 String taskState = null;
098 int fromEvent = 0;
099 int nEvents = 0;
100 boolean getStatus = false;
101 boolean getCounter = false;
102 boolean killJob = false;
103 boolean listEvents = false;
104 boolean viewHistory = false;
105 boolean viewAllHistory = false;
106 boolean listJobs = false;
107 boolean listAllJobs = false;
108 boolean listActiveTrackers = false;
109 boolean listBlacklistedTrackers = false;
110 boolean displayTasks = false;
111 boolean killTask = false;
112 boolean failTask = false;
113 boolean setJobPriority = false;
114 boolean logs = false;
115
116 if ("-submit".equals(cmd)) {
117 if (argv.length != 2) {
118 displayUsage(cmd);
119 return exitCode;
120 }
121 submitJobFile = argv[1];
122 } else if ("-status".equals(cmd)) {
123 if (argv.length != 2) {
124 displayUsage(cmd);
125 return exitCode;
126 }
127 jobid = argv[1];
128 getStatus = true;
129 } else if("-counter".equals(cmd)) {
130 if (argv.length != 4) {
131 displayUsage(cmd);
132 return exitCode;
133 }
134 getCounter = true;
135 jobid = argv[1];
136 counterGroupName = argv[2];
137 counterName = argv[3];
138 } else if ("-kill".equals(cmd)) {
139 if (argv.length != 2) {
140 displayUsage(cmd);
141 return exitCode;
142 }
143 jobid = argv[1];
144 killJob = true;
145 } else if ("-set-priority".equals(cmd)) {
146 if (argv.length != 3) {
147 displayUsage(cmd);
148 return exitCode;
149 }
150 jobid = argv[1];
151 try {
152 jp = JobPriority.valueOf(argv[2]);
153 } catch (IllegalArgumentException iae) {
154 LOG.info(iae);
155 displayUsage(cmd);
156 return exitCode;
157 }
158 setJobPriority = true;
159 } else if ("-events".equals(cmd)) {
160 if (argv.length != 4) {
161 displayUsage(cmd);
162 return exitCode;
163 }
164 jobid = argv[1];
165 fromEvent = Integer.parseInt(argv[2]);
166 nEvents = Integer.parseInt(argv[3]);
167 listEvents = true;
168 } else if ("-history".equals(cmd)) {
169 if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
170 displayUsage(cmd);
171 return exitCode;
172 }
173 viewHistory = true;
174 if (argv.length == 3 && "all".equals(argv[1])) {
175 viewAllHistory = true;
176 historyFile = argv[2];
177 } else {
178 historyFile = argv[1];
179 }
180 } else if ("-list".equals(cmd)) {
181 if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
182 displayUsage(cmd);
183 return exitCode;
184 }
185 if (argv.length == 2 && "all".equals(argv[1])) {
186 listAllJobs = true;
187 } else {
188 listJobs = true;
189 }
190 } else if("-kill-task".equals(cmd)) {
191 if (argv.length != 2) {
192 displayUsage(cmd);
193 return exitCode;
194 }
195 killTask = true;
196 taskid = argv[1];
197 } else if("-fail-task".equals(cmd)) {
198 if (argv.length != 2) {
199 displayUsage(cmd);
200 return exitCode;
201 }
202 failTask = true;
203 taskid = argv[1];
204 } else if ("-list-active-trackers".equals(cmd)) {
205 if (argv.length != 1) {
206 displayUsage(cmd);
207 return exitCode;
208 }
209 listActiveTrackers = true;
210 } else if ("-list-blacklisted-trackers".equals(cmd)) {
211 if (argv.length != 1) {
212 displayUsage(cmd);
213 return exitCode;
214 }
215 listBlacklistedTrackers = true;
216 } else if ("-list-attempt-ids".equals(cmd)) {
217 if (argv.length != 4) {
218 displayUsage(cmd);
219 return exitCode;
220 }
221 jobid = argv[1];
222 taskType = argv[2];
223 taskState = argv[3];
224 displayTasks = true;
225 if (!taskTypes.contains(taskType.toUpperCase())) {
226 System.out.println("Error: Invalid task-type: " + taskType);
227 displayUsage(cmd);
228 return exitCode;
229 }
230 if (!taskStates.contains(taskState.toLowerCase())) {
231 System.out.println("Error: Invalid task-state: " + taskState);
232 displayUsage(cmd);
233 return exitCode;
234 }
235 } else if ("-logs".equals(cmd)) {
236 if (argv.length == 2 || argv.length ==3) {
237 logs = true;
238 jobid = argv[1];
239 if (argv.length == 3) {
240 taskid = argv[2];
241 } else {
242 taskid = null;
243 }
244 } else {
245 displayUsage(cmd);
246 return exitCode;
247 }
248 } else {
249 displayUsage(cmd);
250 return exitCode;
251 }
252
253 // initialize cluster
254 cluster = createCluster();
255
256 // Submit the request
257 try {
258 if (submitJobFile != null) {
259 Job job = Job.getInstance(new JobConf(submitJobFile));
260 job.submit();
261 System.out.println("Created job " + job.getJobID());
262 exitCode = 0;
263 } else if (getStatus) {
264 Job job = cluster.getJob(JobID.forName(jobid));
265 if (job == null) {
266 System.out.println("Could not find job " + jobid);
267 } else {
268 Counters counters = job.getCounters();
269 System.out.println();
270 System.out.println(job);
271 if (counters != null) {
272 System.out.println(counters);
273 } else {
274 System.out.println("Counters not available. Job is retired.");
275 }
276 exitCode = 0;
277 }
278 } else if (getCounter) {
279 Job job = cluster.getJob(JobID.forName(jobid));
280 if (job == null) {
281 System.out.println("Could not find job " + jobid);
282 } else {
283 Counters counters = job.getCounters();
284 if (counters == null) {
285 System.out.println("Counters not available for retired job " +
286 jobid);
287 exitCode = -1;
288 } else {
289 System.out.println(getCounter(counters,
290 counterGroupName, counterName));
291 exitCode = 0;
292 }
293 }
294 } else if (killJob) {
295 Job job = cluster.getJob(JobID.forName(jobid));
296 if (job == null) {
297 System.out.println("Could not find job " + jobid);
298 } else {
299 job.killJob();
300 System.out.println("Killed job " + jobid);
301 exitCode = 0;
302 }
303 } else if (setJobPriority) {
304 Job job = cluster.getJob(JobID.forName(jobid));
305 if (job == null) {
306 System.out.println("Could not find job " + jobid);
307 } else {
308 job.setPriority(jp);
309 System.out.println("Changed job priority.");
310 exitCode = 0;
311 }
312 } else if (viewHistory) {
313 viewHistory(historyFile, viewAllHistory);
314 exitCode = 0;
315 } else if (listEvents) {
316 listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents);
317 exitCode = 0;
318 } else if (listJobs) {
319 listJobs(cluster);
320 exitCode = 0;
321 } else if (listAllJobs) {
322 listAllJobs(cluster);
323 exitCode = 0;
324 } else if (listActiveTrackers) {
325 listActiveTrackers(cluster);
326 exitCode = 0;
327 } else if (listBlacklistedTrackers) {
328 listBlacklistedTrackers(cluster);
329 exitCode = 0;
330 } else if (displayTasks) {
331 displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
332 exitCode = 0;
333 } else if(killTask) {
334 TaskAttemptID taskID = TaskAttemptID.forName(taskid);
335 Job job = cluster.getJob(taskID.getJobID());
336 if (job == null) {
337 System.out.println("Could not find job " + jobid);
338 } else if (job.killTask(taskID, false)) {
339 System.out.println("Killed task " + taskid);
340 exitCode = 0;
341 } else {
342 System.out.println("Could not kill task " + taskid);
343 exitCode = -1;
344 }
345 } else if(failTask) {
346 TaskAttemptID taskID = TaskAttemptID.forName(taskid);
347 Job job = cluster.getJob(taskID.getJobID());
348 if (job == null) {
349 System.out.println("Could not find job " + jobid);
350 } else if(job.killTask(taskID, true)) {
351 System.out.println("Killed task " + taskID + " by failing it");
352 exitCode = 0;
353 } else {
354 System.out.println("Could not fail task " + taskid);
355 exitCode = -1;
356 }
357 } else if (logs) {
358 try {
359 JobID jobID = JobID.forName(jobid);
360 TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
361 LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
362 LogCLIHelpers logDumper = new LogCLIHelpers();
363 logDumper.setConf(getConf());
364 exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(),
365 logParams.getContainerId(), logParams.getNodeId(),
366 logParams.getOwner());
367 } catch (IOException e) {
368 if (e instanceof RemoteException) {
369 throw e;
370 }
371 System.out.println(e.getMessage());
372 }
373 }
374 } catch (RemoteException re) {
375 IOException unwrappedException = re.unwrapRemoteException();
376 if (unwrappedException instanceof AccessControlException) {
377 System.out.println(unwrappedException.getMessage());
378 } else {
379 throw re;
380 }
381 } finally {
382 cluster.close();
383 }
384 return exitCode;
385 }
386
387 Cluster createCluster() throws IOException {
388 return new Cluster(getConf());
389 }
390
391 private String getJobPriorityNames() {
392 StringBuffer sb = new StringBuffer();
393 for (JobPriority p : JobPriority.values()) {
394 sb.append(p.name()).append(" ");
395 }
396 return sb.substring(0, sb.length()-1);
397 }
398
399 private String getTaskTypes() {
400 return StringUtils.join(taskTypes, " ");
401 }
402
403 /**
404 * Display usage of the command-line tool and terminate execution.
405 */
406 private void displayUsage(String cmd) {
407 String prefix = "Usage: CLI ";
408 String jobPriorityValues = getJobPriorityNames();
409 String taskStates = "running, completed";
410
411 if ("-submit".equals(cmd)) {
412 System.err.println(prefix + "[" + cmd + " <job-file>]");
413 } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
414 System.err.println(prefix + "[" + cmd + " <job-id>]");
415 } else if ("-counter".equals(cmd)) {
416 System.err.println(prefix + "[" + cmd +
417 " <job-id> <group-name> <counter-name>]");
418 } else if ("-events".equals(cmd)) {
419 System.err.println(prefix + "[" + cmd +
420 " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
421 } else if ("-history".equals(cmd)) {
422 System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]");
423 } else if ("-list".equals(cmd)) {
424 System.err.println(prefix + "[" + cmd + " [all]]");
425 } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
426 System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
427 } else if ("-set-priority".equals(cmd)) {
428 System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
429 "Valid values for priorities are: "
430 + jobPriorityValues);
431 } else if ("-list-active-trackers".equals(cmd)) {
432 System.err.println(prefix + "[" + cmd + "]");
433 } else if ("-list-blacklisted-trackers".equals(cmd)) {
434 System.err.println(prefix + "[" + cmd + "]");
435 } else if ("-list-attempt-ids".equals(cmd)) {
436 System.err.println(prefix + "[" + cmd +
437 " <job-id> <task-type> <task-state>]. " +
438 "Valid values for <task-type> are " + getTaskTypes() + ". " +
439 "Valid values for <task-state> are " + taskStates);
440 } else if ("-logs".equals(cmd)) {
441 System.err.println(prefix + "[" + cmd +
442 " <job-id> <task-attempt-id>]. " +
443 " <task-attempt-id> is optional to get task attempt logs.");
444 } else {
445 System.err.printf(prefix + "<command> <args>%n");
446 System.err.printf("\t[-submit <job-file>]%n");
447 System.err.printf("\t[-status <job-id>]%n");
448 System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n");
449 System.err.printf("\t[-kill <job-id>]%n");
450 System.err.printf("\t[-set-priority <job-id> <priority>]. " +
451 "Valid values for priorities are: " + jobPriorityValues + "%n");
452 System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
453 System.err.printf("\t[-history <jobHistoryFile>]%n");
454 System.err.printf("\t[-list [all]]%n");
455 System.err.printf("\t[-list-active-trackers]%n");
456 System.err.printf("\t[-list-blacklisted-trackers]%n");
457 System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
458 "<task-state>]. " +
459 "Valid values for <task-type> are " + getTaskTypes() + ". " +
460 "Valid values for <task-state> are " + taskStates);
461 System.err.printf("\t[-kill-task <task-attempt-id>]%n");
462 System.err.printf("\t[-fail-task <task-attempt-id>]%n");
463 System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n%n");
464 ToolRunner.printGenericCommandUsage(System.out);
465 }
466 }
467
468 private void viewHistory(String historyFile, boolean all)
469 throws IOException {
470 HistoryViewer historyViewer = new HistoryViewer(historyFile,
471 getConf(), all);
472 historyViewer.print();
473 }
474
475 protected long getCounter(Counters counters, String counterGroupName,
476 String counterName) throws IOException {
477 return counters.findCounter(counterGroupName, counterName).getValue();
478 }
479
480 /**
481 * List the events for the given job
482 * @param jobId the job id for the job's events to list
483 * @throws IOException
484 */
485 private void listEvents(Job job, int fromEventId, int numEvents)
486 throws IOException, InterruptedException {
487 TaskCompletionEvent[] events = job.
488 getTaskCompletionEvents(fromEventId, numEvents);
489 System.out.println("Task completion events for " + job.getJobID());
490 System.out.println("Number of events (from " + fromEventId + ") are: "
491 + events.length);
492 for(TaskCompletionEvent event: events) {
493 System.out.println(event.getStatus() + " " +
494 event.getTaskAttemptId() + " " +
495 getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
496 }
497 }
498
499 protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
500 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
501 }
502
503
504 /**
505 * Dump a list of currently running jobs
506 * @throws IOException
507 */
508 private void listJobs(Cluster cluster)
509 throws IOException, InterruptedException {
510 List<JobStatus> runningJobs = new ArrayList<JobStatus>();
511 for (JobStatus job : cluster.getAllJobStatuses()) {
512 if (!job.isJobComplete()) {
513 runningJobs.add(job);
514 }
515 }
516 displayJobList(runningJobs.toArray(new JobStatus[0]));
517 }
518
519 /**
520 * Dump a list of all jobs submitted.
521 * @throws IOException
522 */
523 private void listAllJobs(Cluster cluster)
524 throws IOException, InterruptedException {
525 displayJobList(cluster.getAllJobStatuses());
526 }
527
528 /**
529 * Display the list of active trackers
530 */
531 private void listActiveTrackers(Cluster cluster)
532 throws IOException, InterruptedException {
533 TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
534 for (TaskTrackerInfo tracker : trackers) {
535 System.out.println(tracker.getTaskTrackerName());
536 }
537 }
538
539 /**
540 * Display the list of blacklisted trackers
541 */
542 private void listBlacklistedTrackers(Cluster cluster)
543 throws IOException, InterruptedException {
544 TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
545 if (trackers.length > 0) {
546 System.out.println("BlackListedNode \t Reason");
547 }
548 for (TaskTrackerInfo tracker : trackers) {
549 System.out.println(tracker.getTaskTrackerName() + "\t" +
550 tracker.getReasonForBlacklist());
551 }
552 }
553
554 private void printTaskAttempts(TaskReport report) {
555 if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
556 System.out.println(report.getSuccessfulTaskAttemptId());
557 } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
558 for (TaskAttemptID t :
559 report.getRunningTaskAttemptIds()) {
560 System.out.println(t);
561 }
562 }
563 }
564
565 /**
566 * Display the information about a job's tasks, of a particular type and
567 * in a particular state
568 *
569 * @param job the job
570 * @param type the type of the task (map/reduce/setup/cleanup)
571 * @param state the state of the task
572 * (pending/running/completed/failed/killed)
573 */
574 protected void displayTasks(Job job, String type, String state)
575 throws IOException, InterruptedException {
576 TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type.toUpperCase()));
577 for (TaskReport report : reports) {
578 TIPStatus status = report.getCurrentStatus();
579 if ((state.equalsIgnoreCase("pending") && status ==TIPStatus.PENDING) ||
580 (state.equalsIgnoreCase("running") && status ==TIPStatus.RUNNING) ||
581 (state.equalsIgnoreCase("completed") && status == TIPStatus.COMPLETE) ||
582 (state.equalsIgnoreCase("failed") && status == TIPStatus.FAILED) ||
583 (state.equalsIgnoreCase("killed") && status == TIPStatus.KILLED)) {
584 printTaskAttempts(report);
585 }
586 }
587 }
588
589 public void displayJobList(JobStatus[] jobs)
590 throws IOException, InterruptedException {
591 displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out,
592 Charsets.UTF_8)));
593 }
594
595 @Private
596 public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
597 @Private
598 public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
599 private static String memPattern = "%dM";
600 private static String UNAVAILABLE = "N/A";
601
602 @Private
603 public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
604 writer.println("Total jobs:" + jobs.length);
605 writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
606 "Queue", "Priority", "UsedContainers",
607 "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
608 for (JobStatus job : jobs) {
609 int numUsedSlots = job.getNumUsedSlots();
610 int numReservedSlots = job.getNumReservedSlots();
611 int usedMem = job.getUsedMem();
612 int rsvdMem = job.getReservedMem();
613 int neededMem = job.getNeededMem();
614 writer.printf(dataPattern,
615 job.getJobID().toString(), job.getState(), job.getStartTime(),
616 job.getUsername(), job.getQueue(),
617 job.getPriority().name(),
618 numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
619 numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
620 usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
621 rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
622 neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
623 job.getSchedulingInfo());
624 }
625 writer.flush();
626 }
627
628 public static void main(String[] argv) throws Exception {
629 int res = ToolRunner.run(new CLI(), argv);
630 ExitUtil.terminate(res);
631 }
632 }