001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapreduce.tools;
019
020import java.io.IOException;
021import java.io.PrintWriter;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Set;
027
028import org.apache.commons.lang.StringUtils;
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.apache.hadoop.classification.InterfaceAudience;
032import org.apache.hadoop.classification.InterfaceStability;
033import org.apache.hadoop.classification.InterfaceAudience.Private;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.conf.Configured;
036import org.apache.hadoop.ipc.RemoteException;
037import org.apache.hadoop.mapred.JobConf;
038import org.apache.hadoop.mapred.TIPStatus;
039import org.apache.hadoop.mapreduce.Cluster;
040import org.apache.hadoop.mapreduce.Counters;
041import org.apache.hadoop.mapreduce.Job;
042import org.apache.hadoop.mapreduce.JobID;
043import org.apache.hadoop.mapreduce.JobPriority;
044import org.apache.hadoop.mapreduce.JobStatus;
045import org.apache.hadoop.mapreduce.TaskAttemptID;
046import org.apache.hadoop.mapreduce.TaskCompletionEvent;
047import org.apache.hadoop.mapreduce.TaskReport;
048import org.apache.hadoop.mapreduce.TaskTrackerInfo;
049import org.apache.hadoop.mapreduce.TaskType;
050import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
051import org.apache.hadoop.mapreduce.v2.LogParams;
052import org.apache.hadoop.security.AccessControlException;
053import org.apache.hadoop.util.ExitUtil;
054import org.apache.hadoop.util.Tool;
055import org.apache.hadoop.util.ToolRunner;
056import org.apache.hadoop.yarn.logaggregation.LogDumper;
057
058/**
059 * Interprets the map reduce cli options 
060 */
061@InterfaceAudience.Public
062@InterfaceStability.Stable
063public class CLI extends Configured implements Tool {
064  private static final Log LOG = LogFactory.getLog(CLI.class);
065  protected Cluster cluster;
066  private static final Set<String> taskTypes = new HashSet<String>(
067      Arrays.asList("MAP", "REDUCE"));
068  private final Set<String> taskStates = new HashSet<String>(Arrays.asList(
069      "running", "completed", "pending", "failed", "killed"));
070 
071  public CLI() {
072  }
073  
074  public CLI(Configuration conf) {
075    setConf(conf);
076  }
077  
078  public int run(String[] argv) throws Exception {
079    int exitCode = -1;
080    if (argv.length < 1) {
081      displayUsage("");
082      return exitCode;
083    }    
084    // process arguments
085    String cmd = argv[0];
086    String submitJobFile = null;
087    String jobid = null;
088    String taskid = null;
089    String historyFile = null;
090    String counterGroupName = null;
091    String counterName = null;
092    JobPriority jp = null;
093    String taskType = null;
094    String taskState = null;
095    int fromEvent = 0;
096    int nEvents = 0;
097    boolean getStatus = false;
098    boolean getCounter = false;
099    boolean killJob = false;
100    boolean listEvents = false;
101    boolean viewHistory = false;
102    boolean viewAllHistory = false;
103    boolean listJobs = false;
104    boolean listAllJobs = false;
105    boolean listActiveTrackers = false;
106    boolean listBlacklistedTrackers = false;
107    boolean displayTasks = false;
108    boolean killTask = false;
109    boolean failTask = false;
110    boolean setJobPriority = false;
111    boolean logs = false;
112
113    if ("-submit".equals(cmd)) {
114      if (argv.length != 2) {
115        displayUsage(cmd);
116        return exitCode;
117      }
118      submitJobFile = argv[1];
119    } else if ("-status".equals(cmd)) {
120      if (argv.length != 2) {
121        displayUsage(cmd);
122        return exitCode;
123      }
124      jobid = argv[1];
125      getStatus = true;
126    } else if("-counter".equals(cmd)) {
127      if (argv.length != 4) {
128        displayUsage(cmd);
129        return exitCode;
130      }
131      getCounter = true;
132      jobid = argv[1];
133      counterGroupName = argv[2];
134      counterName = argv[3];
135    } else if ("-kill".equals(cmd)) {
136      if (argv.length != 2) {
137        displayUsage(cmd);
138        return exitCode;
139      }
140      jobid = argv[1];
141      killJob = true;
142    } else if ("-set-priority".equals(cmd)) {
143      if (argv.length != 3) {
144        displayUsage(cmd);
145        return exitCode;
146      }
147      jobid = argv[1];
148      try {
149        jp = JobPriority.valueOf(argv[2]); 
150      } catch (IllegalArgumentException iae) {
151        LOG.info(iae);
152        displayUsage(cmd);
153        return exitCode;
154      }
155      setJobPriority = true; 
156    } else if ("-events".equals(cmd)) {
157      if (argv.length != 4) {
158        displayUsage(cmd);
159        return exitCode;
160      }
161      jobid = argv[1];
162      fromEvent = Integer.parseInt(argv[2]);
163      nEvents = Integer.parseInt(argv[3]);
164      listEvents = true;
165    } else if ("-history".equals(cmd)) {
166      if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
167         displayUsage(cmd);
168         return exitCode;
169      }
170      viewHistory = true;
171      if (argv.length == 3 && "all".equals(argv[1])) {
172        viewAllHistory = true;
173        historyFile = argv[2];
174      } else {
175        historyFile = argv[1];
176      }
177    } else if ("-list".equals(cmd)) {
178      if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
179        displayUsage(cmd);
180        return exitCode;
181      }
182      if (argv.length == 2 && "all".equals(argv[1])) {
183        listAllJobs = true;
184      } else {
185        listJobs = true;
186      }
187    } else if("-kill-task".equals(cmd)) {
188      if (argv.length != 2) {
189        displayUsage(cmd);
190        return exitCode;
191      }
192      killTask = true;
193      taskid = argv[1];
194    } else if("-fail-task".equals(cmd)) {
195      if (argv.length != 2) {
196        displayUsage(cmd);
197        return exitCode;
198      }
199      failTask = true;
200      taskid = argv[1];
201    } else if ("-list-active-trackers".equals(cmd)) {
202      if (argv.length != 1) {
203        displayUsage(cmd);
204        return exitCode;
205      }
206      listActiveTrackers = true;
207    } else if ("-list-blacklisted-trackers".equals(cmd)) {
208      if (argv.length != 1) {
209        displayUsage(cmd);
210        return exitCode;
211      }
212      listBlacklistedTrackers = true;
213    } else if ("-list-attempt-ids".equals(cmd)) {
214      if (argv.length != 4) {
215        displayUsage(cmd);
216        return exitCode;
217      }
218      jobid = argv[1];
219      taskType = argv[2];
220      taskState = argv[3];
221      displayTasks = true;
222      if (!taskTypes.contains(taskType.toUpperCase())) {
223        System.out.println("Error: Invalid task-type: " + taskType);
224        displayUsage(cmd);
225        return exitCode;
226      }
227      if (!taskStates.contains(taskState.toLowerCase())) {
228        System.out.println("Error: Invalid task-state: " + taskState);
229        displayUsage(cmd);
230        return exitCode;
231      }
232    } else if ("-logs".equals(cmd)) {
233      if (argv.length == 2 || argv.length ==3) {
234        logs = true;
235        jobid = argv[1];
236        if (argv.length == 3) {
237          taskid = argv[2];
238        }  else {
239          taskid = null;
240        }
241      } else {
242        displayUsage(cmd);
243        return exitCode;
244      }
245    } else {
246      displayUsage(cmd);
247      return exitCode;
248    }
249
250    // initialize cluster
251    cluster = createCluster();
252        
253    // Submit the request
254    try {
255      if (submitJobFile != null) {
256        Job job = Job.getInstance(new JobConf(submitJobFile));
257        job.submit();
258        System.out.println("Created job " + job.getJobID());
259        exitCode = 0;
260      } else if (getStatus) {
261        Job job = cluster.getJob(JobID.forName(jobid));
262        if (job == null) {
263          System.out.println("Could not find job " + jobid);
264        } else {
265          Counters counters = job.getCounters();
266          System.out.println();
267          System.out.println(job);
268          if (counters != null) {
269            System.out.println(counters);
270          } else {
271            System.out.println("Counters not available. Job is retired.");
272          }
273          exitCode = 0;
274        }
275      } else if (getCounter) {
276        Job job = cluster.getJob(JobID.forName(jobid));
277        if (job == null) {
278          System.out.println("Could not find job " + jobid);
279        } else {
280          Counters counters = job.getCounters();
281          if (counters == null) {
282            System.out.println("Counters not available for retired job " + 
283            jobid);
284            exitCode = -1;
285          } else {
286            System.out.println(getCounter(counters,
287              counterGroupName, counterName));
288            exitCode = 0;
289          }
290        }
291      } else if (killJob) {
292        Job job = cluster.getJob(JobID.forName(jobid));
293        if (job == null) {
294          System.out.println("Could not find job " + jobid);
295        } else {
296          job.killJob();
297          System.out.println("Killed job " + jobid);
298          exitCode = 0;
299        }
300      } else if (setJobPriority) {
301        Job job = cluster.getJob(JobID.forName(jobid));
302        if (job == null) {
303          System.out.println("Could not find job " + jobid);
304        } else {
305          job.setPriority(jp);
306          System.out.println("Changed job priority.");
307          exitCode = 0;
308        } 
309      } else if (viewHistory) {
310        viewHistory(historyFile, viewAllHistory);
311        exitCode = 0;
312      } else if (listEvents) {
313        listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents);
314        exitCode = 0;
315      } else if (listJobs) {
316        listJobs(cluster);
317        exitCode = 0;
318      } else if (listAllJobs) {
319        listAllJobs(cluster);
320        exitCode = 0;
321      } else if (listActiveTrackers) {
322        listActiveTrackers(cluster);
323        exitCode = 0;
324      } else if (listBlacklistedTrackers) {
325        listBlacklistedTrackers(cluster);
326        exitCode = 0;
327      } else if (displayTasks) {
328        displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
329        exitCode = 0;
330      } else if(killTask) {
331        TaskAttemptID taskID = TaskAttemptID.forName(taskid);
332        Job job = cluster.getJob(taskID.getJobID());
333        if (job == null) {
334          System.out.println("Could not find job " + jobid);
335        } else if (job.killTask(taskID)) {
336          System.out.println("Killed task " + taskid);
337          exitCode = 0;
338        } else {
339          System.out.println("Could not kill task " + taskid);
340          exitCode = -1;
341        }
342      } else if(failTask) {
343        TaskAttemptID taskID = TaskAttemptID.forName(taskid);
344        Job job = cluster.getJob(taskID.getJobID());
345        if (job == null) {
346            System.out.println("Could not find job " + jobid);
347        } else if(job.failTask(taskID)) {
348          System.out.println("Killed task " + taskID + " by failing it");
349          exitCode = 0;
350        } else {
351          System.out.println("Could not fail task " + taskid);
352          exitCode = -1;
353        }
354      } else if (logs) {
355        try {
356        JobID jobID = JobID.forName(jobid);
357        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
358        LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
359        LogDumper logDumper = new LogDumper();
360        logDumper.setConf(getConf());
361        exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(),
362            logParams.getContainerId(), logParams.getNodeId(),
363            logParams.getOwner());
364        } catch (IOException e) {
365          if (e instanceof RemoteException) {
366            throw e;
367          } 
368          System.out.println(e.getMessage());
369        }
370      }
371    } catch (RemoteException re) {
372      IOException unwrappedException = re.unwrapRemoteException();
373      if (unwrappedException instanceof AccessControlException) {
374        System.out.println(unwrappedException.getMessage());
375      } else {
376        throw re;
377      }
378    } finally {
379      cluster.close();
380    }
381    return exitCode;
382  }
383
384  Cluster createCluster() throws IOException {
385    return new Cluster(getConf());
386  }
387
388  private String getJobPriorityNames() {
389    StringBuffer sb = new StringBuffer();
390    for (JobPriority p : JobPriority.values()) {
391      sb.append(p.name()).append(" ");
392    }
393    return sb.substring(0, sb.length()-1);
394  }
395
396  private String getTaskTypes() {
397    return StringUtils.join(taskTypes, " ");
398  }
399
400  /**
401   * Display usage of the command-line tool and terminate execution.
402   */
403  private void displayUsage(String cmd) {
404    String prefix = "Usage: CLI ";
405    String jobPriorityValues = getJobPriorityNames();
406    String taskStates = "running, completed";
407
408    if ("-submit".equals(cmd)) {
409      System.err.println(prefix + "[" + cmd + " <job-file>]");
410    } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
411      System.err.println(prefix + "[" + cmd + " <job-id>]");
412    } else if ("-counter".equals(cmd)) {
413      System.err.println(prefix + "[" + cmd + 
414        " <job-id> <group-name> <counter-name>]");
415    } else if ("-events".equals(cmd)) {
416      System.err.println(prefix + "[" + cmd + 
417        " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
418    } else if ("-history".equals(cmd)) {
419      System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]");
420    } else if ("-list".equals(cmd)) {
421      System.err.println(prefix + "[" + cmd + " [all]]");
422    } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
423      System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
424    } else if ("-set-priority".equals(cmd)) {
425      System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
426          "Valid values for priorities are: " 
427          + jobPriorityValues); 
428    } else if ("-list-active-trackers".equals(cmd)) {
429      System.err.println(prefix + "[" + cmd + "]");
430    } else if ("-list-blacklisted-trackers".equals(cmd)) {
431      System.err.println(prefix + "[" + cmd + "]");
432    } else if ("-list-attempt-ids".equals(cmd)) {
433      System.err.println(prefix + "[" + cmd + 
434          " <job-id> <task-type> <task-state>]. " +
435          "Valid values for <task-type> are " + getTaskTypes() + ". " +
436          "Valid values for <task-state> are " + taskStates);
437    } else if ("-logs".equals(cmd)) {
438      System.err.println(prefix + "[" + cmd +
439          " <job-id> <task-attempt-id>]. " +
440          " <task-attempt-id> is optional to get task attempt logs.");      
441    } else {
442      System.err.printf(prefix + "<command> <args>\n");
443      System.err.printf("\t[-submit <job-file>]\n");
444      System.err.printf("\t[-status <job-id>]\n");
445      System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]\n");
446      System.err.printf("\t[-kill <job-id>]\n");
447      System.err.printf("\t[-set-priority <job-id> <priority>]. " +
448        "Valid values for priorities are: " + jobPriorityValues + "\n");
449      System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]\n");
450      System.err.printf("\t[-history <jobHistoryFile>]\n");
451      System.err.printf("\t[-list [all]]\n");
452      System.err.printf("\t[-list-active-trackers]\n");
453      System.err.printf("\t[-list-blacklisted-trackers]\n");
454      System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
455        "<task-state>]. " +
456        "Valid values for <task-type> are " + getTaskTypes() + ". " +
457        "Valid values for <task-state> are " + taskStates);
458      System.err.printf("\t[-kill-task <task-attempt-id>]\n");
459      System.err.printf("\t[-fail-task <task-attempt-id>]\n");
460      System.err.printf("\t[-logs <job-id> <task-attempt-id>]\n\n");
461      ToolRunner.printGenericCommandUsage(System.out);
462    }
463  }
464    
465  private void viewHistory(String historyFile, boolean all) 
466    throws IOException {
467    HistoryViewer historyViewer = new HistoryViewer(historyFile,
468                                        getConf(), all);
469    historyViewer.print();
470  }
471
472  protected long getCounter(Counters counters, String counterGroupName,
473      String counterName) throws IOException {
474    return counters.findCounter(counterGroupName, counterName).getValue();
475  }
476  
477  /**
478   * List the events for the given job
479   * @param jobId the job id for the job's events to list
480   * @throws IOException
481   */
482  private void listEvents(Job job, int fromEventId, int numEvents)
483      throws IOException, InterruptedException {
484    TaskCompletionEvent[] events = job.
485      getTaskCompletionEvents(fromEventId, numEvents);
486    System.out.println("Task completion events for " + job.getJobID());
487    System.out.println("Number of events (from " + fromEventId + ") are: " 
488      + events.length);
489    for(TaskCompletionEvent event: events) {
490      System.out.println(event.getStatus() + " " + 
491        event.getTaskAttemptId() + " " + 
492        getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
493    }
494  }
495
496  protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
497    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
498  }
499  
500
501  /**
502   * Dump a list of currently running jobs
503   * @throws IOException
504   */
505  private void listJobs(Cluster cluster) 
506      throws IOException, InterruptedException {
507    List<JobStatus> runningJobs = new ArrayList<JobStatus>();
508    for (JobStatus job : cluster.getAllJobStatuses()) {
509      if (!job.isJobComplete()) {
510        runningJobs.add(job);
511      }
512    }
513    displayJobList(runningJobs.toArray(new JobStatus[0]));
514  }
515    
516  /**
517   * Dump a list of all jobs submitted.
518   * @throws IOException
519   */
520  private void listAllJobs(Cluster cluster) 
521      throws IOException, InterruptedException {
522    displayJobList(cluster.getAllJobStatuses());
523  }
524  
525  /**
526   * Display the list of active trackers
527   */
528  private void listActiveTrackers(Cluster cluster) 
529      throws IOException, InterruptedException {
530    TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
531    for (TaskTrackerInfo tracker : trackers) {
532      System.out.println(tracker.getTaskTrackerName());
533    }
534  }
535
536  /**
537   * Display the list of blacklisted trackers
538   */
539  private void listBlacklistedTrackers(Cluster cluster) 
540      throws IOException, InterruptedException {
541    TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
542    if (trackers.length > 0) {
543      System.out.println("BlackListedNode \t Reason");
544    }
545    for (TaskTrackerInfo tracker : trackers) {
546      System.out.println(tracker.getTaskTrackerName() + "\t" + 
547        tracker.getReasonForBlacklist());
548    }
549  }
550
551  private void printTaskAttempts(TaskReport report) {
552    if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
553      System.out.println(report.getSuccessfulTaskAttemptId());
554    } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
555      for (TaskAttemptID t : 
556        report.getRunningTaskAttemptIds()) {
557        System.out.println(t);
558      }
559    }
560  }
561
562  /**
563   * Display the information about a job's tasks, of a particular type and
564   * in a particular state
565   * 
566   * @param job the job
567   * @param type the type of the task (map/reduce/setup/cleanup)
568   * @param state the state of the task 
569   * (pending/running/completed/failed/killed)
570   */
571  protected void displayTasks(Job job, String type, String state) 
572  throws IOException, InterruptedException {
573    TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type.toUpperCase()));
574    for (TaskReport report : reports) {
575      TIPStatus status = report.getCurrentStatus();
576      if ((state.equalsIgnoreCase("pending") && status ==TIPStatus.PENDING) ||
577          (state.equalsIgnoreCase("running") && status ==TIPStatus.RUNNING) ||
578          (state.equalsIgnoreCase("completed") && status == TIPStatus.COMPLETE) ||
579          (state.equalsIgnoreCase("failed") && status == TIPStatus.FAILED) ||
580          (state.equalsIgnoreCase("killed") && status == TIPStatus.KILLED)) {
581        printTaskAttempts(report);
582      }
583    }
584  }
585
586  public void displayJobList(JobStatus[] jobs) 
587      throws IOException, InterruptedException {
588    displayJobList(jobs, new PrintWriter(System.out));
589  }
590
591  @Private
592  public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
593  @Private
594  public static String dataPattern   = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
595  private static String memPattern   = "%dM";
596  private static String UNAVAILABLE  = "N/A";
597
598  @Private
599  public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
600    writer.println("Total jobs:" + jobs.length);
601    writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
602      "Queue", "Priority", "UsedContainers",
603      "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
604    for (JobStatus job : jobs) {
605      int numUsedSlots = job.getNumUsedSlots();
606      int numReservedSlots = job.getNumReservedSlots();
607      int usedMem = job.getUsedMem();
608      int rsvdMem = job.getReservedMem();
609      int neededMem = job.getNeededMem();
610      writer.printf(dataPattern,
611          job.getJobID().toString(), job.getState(), job.getStartTime(),
612          job.getUsername(), job.getQueue(), 
613          job.getPriority().name(),
614          numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
615          numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
616          usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
617          rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
618          neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
619          job.getSchedulingInfo());
620    }
621    writer.flush();
622  }
623  
624  public static void main(String[] argv) throws Exception {
625    int res = ToolRunner.run(new CLI(), argv);
626    ExitUtil.terminate(res);
627  }
628}