001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapreduce.tools;
019
020import java.io.IOException;
021import java.io.OutputStreamWriter;
022import java.io.PrintWriter;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Set;
028
029import org.apache.commons.lang.StringUtils;
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.apache.hadoop.classification.InterfaceAudience;
033import org.apache.hadoop.classification.InterfaceStability;
034import org.apache.hadoop.classification.InterfaceAudience.Private;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.conf.Configured;
037import org.apache.hadoop.ipc.RemoteException;
038import org.apache.hadoop.mapred.JobConf;
039import org.apache.hadoop.mapred.TIPStatus;
040import org.apache.hadoop.mapreduce.Cluster;
041import org.apache.hadoop.mapreduce.Counters;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.JobID;
044import org.apache.hadoop.mapreduce.JobPriority;
045import org.apache.hadoop.mapreduce.JobStatus;
046import org.apache.hadoop.mapreduce.TaskAttemptID;
047import org.apache.hadoop.mapreduce.TaskCompletionEvent;
048import org.apache.hadoop.mapreduce.TaskReport;
049import org.apache.hadoop.mapreduce.TaskTrackerInfo;
050import org.apache.hadoop.mapreduce.TaskType;
051import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
052import org.apache.hadoop.mapreduce.v2.LogParams;
053import org.apache.hadoop.security.AccessControlException;
054import org.apache.hadoop.util.ExitUtil;
055import org.apache.hadoop.util.Tool;
056import org.apache.hadoop.util.ToolRunner;
057import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
058
059import com.google.common.base.Charsets;
060
061/**
062 * Interprets the map reduce cli options 
063 */
064@InterfaceAudience.Public
065@InterfaceStability.Stable
066public class CLI extends Configured implements Tool {
067  private static final Log LOG = LogFactory.getLog(CLI.class);
068  protected Cluster cluster;
069  private static final Set<String> taskTypes = new HashSet<String>(
070      Arrays.asList("MAP", "REDUCE"));
071  private final Set<String> taskStates = new HashSet<String>(Arrays.asList(
072      "running", "completed", "pending", "failed", "killed"));
073 
074  public CLI() {
075  }
076  
077  public CLI(Configuration conf) {
078    setConf(conf);
079  }
080  
081  public int run(String[] argv) throws Exception {
082    int exitCode = -1;
083    if (argv.length < 1) {
084      displayUsage("");
085      return exitCode;
086    }    
087    // process arguments
088    String cmd = argv[0];
089    String submitJobFile = null;
090    String jobid = null;
091    String taskid = null;
092    String historyFile = null;
093    String counterGroupName = null;
094    String counterName = null;
095    JobPriority jp = null;
096    String taskType = null;
097    String taskState = null;
098    int fromEvent = 0;
099    int nEvents = 0;
100    boolean getStatus = false;
101    boolean getCounter = false;
102    boolean killJob = false;
103    boolean listEvents = false;
104    boolean viewHistory = false;
105    boolean viewAllHistory = false;
106    boolean listJobs = false;
107    boolean listAllJobs = false;
108    boolean listActiveTrackers = false;
109    boolean listBlacklistedTrackers = false;
110    boolean displayTasks = false;
111    boolean killTask = false;
112    boolean failTask = false;
113    boolean setJobPriority = false;
114    boolean logs = false;
115
116    if ("-submit".equals(cmd)) {
117      if (argv.length != 2) {
118        displayUsage(cmd);
119        return exitCode;
120      }
121      submitJobFile = argv[1];
122    } else if ("-status".equals(cmd)) {
123      if (argv.length != 2) {
124        displayUsage(cmd);
125        return exitCode;
126      }
127      jobid = argv[1];
128      getStatus = true;
129    } else if("-counter".equals(cmd)) {
130      if (argv.length != 4) {
131        displayUsage(cmd);
132        return exitCode;
133      }
134      getCounter = true;
135      jobid = argv[1];
136      counterGroupName = argv[2];
137      counterName = argv[3];
138    } else if ("-kill".equals(cmd)) {
139      if (argv.length != 2) {
140        displayUsage(cmd);
141        return exitCode;
142      }
143      jobid = argv[1];
144      killJob = true;
145    } else if ("-set-priority".equals(cmd)) {
146      if (argv.length != 3) {
147        displayUsage(cmd);
148        return exitCode;
149      }
150      jobid = argv[1];
151      try {
152        jp = JobPriority.valueOf(argv[2]); 
153      } catch (IllegalArgumentException iae) {
154        LOG.info(iae);
155        displayUsage(cmd);
156        return exitCode;
157      }
158      setJobPriority = true; 
159    } else if ("-events".equals(cmd)) {
160      if (argv.length != 4) {
161        displayUsage(cmd);
162        return exitCode;
163      }
164      jobid = argv[1];
165      fromEvent = Integer.parseInt(argv[2]);
166      nEvents = Integer.parseInt(argv[3]);
167      listEvents = true;
168    } else if ("-history".equals(cmd)) {
169      if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
170         displayUsage(cmd);
171         return exitCode;
172      }
173      viewHistory = true;
174      if (argv.length == 3 && "all".equals(argv[1])) {
175        viewAllHistory = true;
176        historyFile = argv[2];
177      } else {
178        historyFile = argv[1];
179      }
180    } else if ("-list".equals(cmd)) {
181      if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
182        displayUsage(cmd);
183        return exitCode;
184      }
185      if (argv.length == 2 && "all".equals(argv[1])) {
186        listAllJobs = true;
187      } else {
188        listJobs = true;
189      }
190    } else if("-kill-task".equals(cmd)) {
191      if (argv.length != 2) {
192        displayUsage(cmd);
193        return exitCode;
194      }
195      killTask = true;
196      taskid = argv[1];
197    } else if("-fail-task".equals(cmd)) {
198      if (argv.length != 2) {
199        displayUsage(cmd);
200        return exitCode;
201      }
202      failTask = true;
203      taskid = argv[1];
204    } else if ("-list-active-trackers".equals(cmd)) {
205      if (argv.length != 1) {
206        displayUsage(cmd);
207        return exitCode;
208      }
209      listActiveTrackers = true;
210    } else if ("-list-blacklisted-trackers".equals(cmd)) {
211      if (argv.length != 1) {
212        displayUsage(cmd);
213        return exitCode;
214      }
215      listBlacklistedTrackers = true;
216    } else if ("-list-attempt-ids".equals(cmd)) {
217      if (argv.length != 4) {
218        displayUsage(cmd);
219        return exitCode;
220      }
221      jobid = argv[1];
222      taskType = argv[2];
223      taskState = argv[3];
224      displayTasks = true;
225      if (!taskTypes.contains(taskType.toUpperCase())) {
226        System.out.println("Error: Invalid task-type: " + taskType);
227        displayUsage(cmd);
228        return exitCode;
229      }
230      if (!taskStates.contains(taskState.toLowerCase())) {
231        System.out.println("Error: Invalid task-state: " + taskState);
232        displayUsage(cmd);
233        return exitCode;
234      }
235    } else if ("-logs".equals(cmd)) {
236      if (argv.length == 2 || argv.length ==3) {
237        logs = true;
238        jobid = argv[1];
239        if (argv.length == 3) {
240          taskid = argv[2];
241        }  else {
242          taskid = null;
243        }
244      } else {
245        displayUsage(cmd);
246        return exitCode;
247      }
248    } else {
249      displayUsage(cmd);
250      return exitCode;
251    }
252
253    // initialize cluster
254    cluster = createCluster();
255        
256    // Submit the request
257    try {
258      if (submitJobFile != null) {
259        Job job = Job.getInstance(new JobConf(submitJobFile));
260        job.submit();
261        System.out.println("Created job " + job.getJobID());
262        exitCode = 0;
263      } else if (getStatus) {
264        Job job = cluster.getJob(JobID.forName(jobid));
265        if (job == null) {
266          System.out.println("Could not find job " + jobid);
267        } else {
268          Counters counters = job.getCounters();
269          System.out.println();
270          System.out.println(job);
271          if (counters != null) {
272            System.out.println(counters);
273          } else {
274            System.out.println("Counters not available. Job is retired.");
275          }
276          exitCode = 0;
277        }
278      } else if (getCounter) {
279        Job job = cluster.getJob(JobID.forName(jobid));
280        if (job == null) {
281          System.out.println("Could not find job " + jobid);
282        } else {
283          Counters counters = job.getCounters();
284          if (counters == null) {
285            System.out.println("Counters not available for retired job " + 
286            jobid);
287            exitCode = -1;
288          } else {
289            System.out.println(getCounter(counters,
290              counterGroupName, counterName));
291            exitCode = 0;
292          }
293        }
294      } else if (killJob) {
295        Job job = cluster.getJob(JobID.forName(jobid));
296        if (job == null) {
297          System.out.println("Could not find job " + jobid);
298        } else {
299          job.killJob();
300          System.out.println("Killed job " + jobid);
301          exitCode = 0;
302        }
303      } else if (setJobPriority) {
304        Job job = cluster.getJob(JobID.forName(jobid));
305        if (job == null) {
306          System.out.println("Could not find job " + jobid);
307        } else {
308          job.setPriority(jp);
309          System.out.println("Changed job priority.");
310          exitCode = 0;
311        } 
312      } else if (viewHistory) {
313        viewHistory(historyFile, viewAllHistory);
314        exitCode = 0;
315      } else if (listEvents) {
316        listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents);
317        exitCode = 0;
318      } else if (listJobs) {
319        listJobs(cluster);
320        exitCode = 0;
321      } else if (listAllJobs) {
322        listAllJobs(cluster);
323        exitCode = 0;
324      } else if (listActiveTrackers) {
325        listActiveTrackers(cluster);
326        exitCode = 0;
327      } else if (listBlacklistedTrackers) {
328        listBlacklistedTrackers(cluster);
329        exitCode = 0;
330      } else if (displayTasks) {
331        displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
332        exitCode = 0;
333      } else if(killTask) {
334        TaskAttemptID taskID = TaskAttemptID.forName(taskid);
335        Job job = cluster.getJob(taskID.getJobID());
336        if (job == null) {
337          System.out.println("Could not find job " + jobid);
338        } else if (job.killTask(taskID, false)) {
339          System.out.println("Killed task " + taskid);
340          exitCode = 0;
341        } else {
342          System.out.println("Could not kill task " + taskid);
343          exitCode = -1;
344        }
345      } else if(failTask) {
346        TaskAttemptID taskID = TaskAttemptID.forName(taskid);
347        Job job = cluster.getJob(taskID.getJobID());
348        if (job == null) {
349            System.out.println("Could not find job " + jobid);
350        } else if(job.killTask(taskID, true)) {
351          System.out.println("Killed task " + taskID + " by failing it");
352          exitCode = 0;
353        } else {
354          System.out.println("Could not fail task " + taskid);
355          exitCode = -1;
356        }
357      } else if (logs) {
358        try {
359        JobID jobID = JobID.forName(jobid);
360        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
361        LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
362        LogCLIHelpers logDumper = new LogCLIHelpers();
363        logDumper.setConf(getConf());
364        exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(),
365            logParams.getContainerId(), logParams.getNodeId(),
366            logParams.getOwner());
367        } catch (IOException e) {
368          if (e instanceof RemoteException) {
369            throw e;
370          } 
371          System.out.println(e.getMessage());
372        }
373      }
374    } catch (RemoteException re) {
375      IOException unwrappedException = re.unwrapRemoteException();
376      if (unwrappedException instanceof AccessControlException) {
377        System.out.println(unwrappedException.getMessage());
378      } else {
379        throw re;
380      }
381    } finally {
382      cluster.close();
383    }
384    return exitCode;
385  }
386
387  Cluster createCluster() throws IOException {
388    return new Cluster(getConf());
389  }
390
391  private String getJobPriorityNames() {
392    StringBuffer sb = new StringBuffer();
393    for (JobPriority p : JobPriority.values()) {
394      sb.append(p.name()).append(" ");
395    }
396    return sb.substring(0, sb.length()-1);
397  }
398
399  private String getTaskTypes() {
400    return StringUtils.join(taskTypes, " ");
401  }
402
403  /**
404   * Display usage of the command-line tool and terminate execution.
405   */
406  private void displayUsage(String cmd) {
407    String prefix = "Usage: CLI ";
408    String jobPriorityValues = getJobPriorityNames();
409    String taskStates = "running, completed";
410
411    if ("-submit".equals(cmd)) {
412      System.err.println(prefix + "[" + cmd + " <job-file>]");
413    } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
414      System.err.println(prefix + "[" + cmd + " <job-id>]");
415    } else if ("-counter".equals(cmd)) {
416      System.err.println(prefix + "[" + cmd + 
417        " <job-id> <group-name> <counter-name>]");
418    } else if ("-events".equals(cmd)) {
419      System.err.println(prefix + "[" + cmd + 
420        " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
421    } else if ("-history".equals(cmd)) {
422      System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]");
423    } else if ("-list".equals(cmd)) {
424      System.err.println(prefix + "[" + cmd + " [all]]");
425    } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
426      System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
427    } else if ("-set-priority".equals(cmd)) {
428      System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
429          "Valid values for priorities are: " 
430          + jobPriorityValues); 
431    } else if ("-list-active-trackers".equals(cmd)) {
432      System.err.println(prefix + "[" + cmd + "]");
433    } else if ("-list-blacklisted-trackers".equals(cmd)) {
434      System.err.println(prefix + "[" + cmd + "]");
435    } else if ("-list-attempt-ids".equals(cmd)) {
436      System.err.println(prefix + "[" + cmd + 
437          " <job-id> <task-type> <task-state>]. " +
438          "Valid values for <task-type> are " + getTaskTypes() + ". " +
439          "Valid values for <task-state> are " + taskStates);
440    } else if ("-logs".equals(cmd)) {
441      System.err.println(prefix + "[" + cmd +
442          " <job-id> <task-attempt-id>]. " +
443          " <task-attempt-id> is optional to get task attempt logs.");      
444    } else {
445      System.err.printf(prefix + "<command> <args>%n");
446      System.err.printf("\t[-submit <job-file>]%n");
447      System.err.printf("\t[-status <job-id>]%n");
448      System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n");
449      System.err.printf("\t[-kill <job-id>]%n");
450      System.err.printf("\t[-set-priority <job-id> <priority>]. " +
451        "Valid values for priorities are: " + jobPriorityValues + "%n");
452      System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
453      System.err.printf("\t[-history <jobHistoryFile>]%n");
454      System.err.printf("\t[-list [all]]%n");
455      System.err.printf("\t[-list-active-trackers]%n");
456      System.err.printf("\t[-list-blacklisted-trackers]%n");
457      System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
458        "<task-state>]. " +
459        "Valid values for <task-type> are " + getTaskTypes() + ". " +
460        "Valid values for <task-state> are " + taskStates);
461      System.err.printf("\t[-kill-task <task-attempt-id>]%n");
462      System.err.printf("\t[-fail-task <task-attempt-id>]%n");
463      System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n%n");
464      ToolRunner.printGenericCommandUsage(System.out);
465    }
466  }
467    
468  private void viewHistory(String historyFile, boolean all) 
469    throws IOException {
470    HistoryViewer historyViewer = new HistoryViewer(historyFile,
471                                        getConf(), all);
472    historyViewer.print();
473  }
474
475  protected long getCounter(Counters counters, String counterGroupName,
476      String counterName) throws IOException {
477    return counters.findCounter(counterGroupName, counterName).getValue();
478  }
479  
480  /**
481   * List the events for the given job
482   * @param jobId the job id for the job's events to list
483   * @throws IOException
484   */
485  private void listEvents(Job job, int fromEventId, int numEvents)
486      throws IOException, InterruptedException {
487    TaskCompletionEvent[] events = job.
488      getTaskCompletionEvents(fromEventId, numEvents);
489    System.out.println("Task completion events for " + job.getJobID());
490    System.out.println("Number of events (from " + fromEventId + ") are: " 
491      + events.length);
492    for(TaskCompletionEvent event: events) {
493      System.out.println(event.getStatus() + " " + 
494        event.getTaskAttemptId() + " " + 
495        getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
496    }
497  }
498
499  protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
500    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
501  }
502  
503
504  /**
505   * Dump a list of currently running jobs
506   * @throws IOException
507   */
508  private void listJobs(Cluster cluster) 
509      throws IOException, InterruptedException {
510    List<JobStatus> runningJobs = new ArrayList<JobStatus>();
511    for (JobStatus job : cluster.getAllJobStatuses()) {
512      if (!job.isJobComplete()) {
513        runningJobs.add(job);
514      }
515    }
516    displayJobList(runningJobs.toArray(new JobStatus[0]));
517  }
518    
519  /**
520   * Dump a list of all jobs submitted.
521   * @throws IOException
522   */
523  private void listAllJobs(Cluster cluster) 
524      throws IOException, InterruptedException {
525    displayJobList(cluster.getAllJobStatuses());
526  }
527  
528  /**
529   * Display the list of active trackers
530   */
531  private void listActiveTrackers(Cluster cluster) 
532      throws IOException, InterruptedException {
533    TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
534    for (TaskTrackerInfo tracker : trackers) {
535      System.out.println(tracker.getTaskTrackerName());
536    }
537  }
538
539  /**
540   * Display the list of blacklisted trackers
541   */
542  private void listBlacklistedTrackers(Cluster cluster) 
543      throws IOException, InterruptedException {
544    TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
545    if (trackers.length > 0) {
546      System.out.println("BlackListedNode \t Reason");
547    }
548    for (TaskTrackerInfo tracker : trackers) {
549      System.out.println(tracker.getTaskTrackerName() + "\t" + 
550        tracker.getReasonForBlacklist());
551    }
552  }
553
554  private void printTaskAttempts(TaskReport report) {
555    if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
556      System.out.println(report.getSuccessfulTaskAttemptId());
557    } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
558      for (TaskAttemptID t : 
559        report.getRunningTaskAttemptIds()) {
560        System.out.println(t);
561      }
562    }
563  }
564
565  /**
566   * Display the information about a job's tasks, of a particular type and
567   * in a particular state
568   * 
569   * @param job the job
570   * @param type the type of the task (map/reduce/setup/cleanup)
571   * @param state the state of the task 
572   * (pending/running/completed/failed/killed)
573   */
574  protected void displayTasks(Job job, String type, String state) 
575  throws IOException, InterruptedException {
576    TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type.toUpperCase()));
577    for (TaskReport report : reports) {
578      TIPStatus status = report.getCurrentStatus();
579      if ((state.equalsIgnoreCase("pending") && status ==TIPStatus.PENDING) ||
580          (state.equalsIgnoreCase("running") && status ==TIPStatus.RUNNING) ||
581          (state.equalsIgnoreCase("completed") && status == TIPStatus.COMPLETE) ||
582          (state.equalsIgnoreCase("failed") && status == TIPStatus.FAILED) ||
583          (state.equalsIgnoreCase("killed") && status == TIPStatus.KILLED)) {
584        printTaskAttempts(report);
585      }
586    }
587  }
588
589  public void displayJobList(JobStatus[] jobs) 
590      throws IOException, InterruptedException {
591    displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out,
592        Charsets.UTF_8)));
593  }
594
595  @Private
596  public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
597  @Private
598  public static String dataPattern   = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
599  private static String memPattern   = "%dM";
600  private static String UNAVAILABLE  = "N/A";
601
602  @Private
603  public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
604    writer.println("Total jobs:" + jobs.length);
605    writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
606      "Queue", "Priority", "UsedContainers",
607      "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
608    for (JobStatus job : jobs) {
609      int numUsedSlots = job.getNumUsedSlots();
610      int numReservedSlots = job.getNumReservedSlots();
611      int usedMem = job.getUsedMem();
612      int rsvdMem = job.getReservedMem();
613      int neededMem = job.getNeededMem();
614      writer.printf(dataPattern,
615          job.getJobID().toString(), job.getState(), job.getStartTime(),
616          job.getUsername(), job.getQueue(), 
617          job.getPriority().name(),
618          numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
619          numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
620          usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
621          rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
622          neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
623          job.getSchedulingInfo());
624    }
625    writer.flush();
626  }
627  
628  public static void main(String[] argv) throws Exception {
629    int res = ToolRunner.run(new CLI(), argv);
630    ExitUtil.terminate(res);
631  }
632}