001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapreduce.tools;
019    
020    import java.io.IOException;
021    import java.io.OutputStreamWriter;
022    import java.io.PrintWriter;
023    import java.util.ArrayList;
024    import java.util.Arrays;
025    import java.util.HashSet;
026    import java.util.List;
027    import java.util.Set;
028    
029    import org.apache.commons.lang.StringUtils;
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    import org.apache.hadoop.classification.InterfaceAudience;
033    import org.apache.hadoop.classification.InterfaceStability;
034    import org.apache.hadoop.classification.InterfaceAudience.Private;
035    import org.apache.hadoop.conf.Configuration;
036    import org.apache.hadoop.conf.Configured;
037    import org.apache.hadoop.ipc.RemoteException;
038    import org.apache.hadoop.mapred.JobConf;
039    import org.apache.hadoop.mapred.TIPStatus;
040    import org.apache.hadoop.mapreduce.Cluster;
041    import org.apache.hadoop.mapreduce.Counters;
042    import org.apache.hadoop.mapreduce.Job;
043    import org.apache.hadoop.mapreduce.JobID;
044    import org.apache.hadoop.mapreduce.JobPriority;
045    import org.apache.hadoop.mapreduce.JobStatus;
046    import org.apache.hadoop.mapreduce.TaskAttemptID;
047    import org.apache.hadoop.mapreduce.TaskCompletionEvent;
048    import org.apache.hadoop.mapreduce.TaskReport;
049    import org.apache.hadoop.mapreduce.TaskTrackerInfo;
050    import org.apache.hadoop.mapreduce.TaskType;
051    import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
052    import org.apache.hadoop.mapreduce.v2.LogParams;
053    import org.apache.hadoop.security.AccessControlException;
054    import org.apache.hadoop.util.ExitUtil;
055    import org.apache.hadoop.util.Tool;
056    import org.apache.hadoop.util.ToolRunner;
057    import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
058    
059    import com.google.common.base.Charsets;
060    
061    /**
062     * Interprets the map reduce cli options 
063     */
064    @InterfaceAudience.Public
065    @InterfaceStability.Stable
066    public class CLI extends Configured implements Tool {
067      private static final Log LOG = LogFactory.getLog(CLI.class);
068      protected Cluster cluster;
069      private static final Set<String> taskTypes = new HashSet<String>(
070          Arrays.asList("MAP", "REDUCE"));
071      private final Set<String> taskStates = new HashSet<String>(Arrays.asList(
072          "running", "completed", "pending", "failed", "killed"));
073     
074      public CLI() {
075      }
076      
077      public CLI(Configuration conf) {
078        setConf(conf);
079      }
080      
081      public int run(String[] argv) throws Exception {
082        int exitCode = -1;
083        if (argv.length < 1) {
084          displayUsage("");
085          return exitCode;
086        }    
087        // process arguments
088        String cmd = argv[0];
089        String submitJobFile = null;
090        String jobid = null;
091        String taskid = null;
092        String historyFile = null;
093        String counterGroupName = null;
094        String counterName = null;
095        JobPriority jp = null;
096        String taskType = null;
097        String taskState = null;
098        int fromEvent = 0;
099        int nEvents = 0;
100        boolean getStatus = false;
101        boolean getCounter = false;
102        boolean killJob = false;
103        boolean listEvents = false;
104        boolean viewHistory = false;
105        boolean viewAllHistory = false;
106        boolean listJobs = false;
107        boolean listAllJobs = false;
108        boolean listActiveTrackers = false;
109        boolean listBlacklistedTrackers = false;
110        boolean displayTasks = false;
111        boolean killTask = false;
112        boolean failTask = false;
113        boolean setJobPriority = false;
114        boolean logs = false;
115    
116        if ("-submit".equals(cmd)) {
117          if (argv.length != 2) {
118            displayUsage(cmd);
119            return exitCode;
120          }
121          submitJobFile = argv[1];
122        } else if ("-status".equals(cmd)) {
123          if (argv.length != 2) {
124            displayUsage(cmd);
125            return exitCode;
126          }
127          jobid = argv[1];
128          getStatus = true;
129        } else if("-counter".equals(cmd)) {
130          if (argv.length != 4) {
131            displayUsage(cmd);
132            return exitCode;
133          }
134          getCounter = true;
135          jobid = argv[1];
136          counterGroupName = argv[2];
137          counterName = argv[3];
138        } else if ("-kill".equals(cmd)) {
139          if (argv.length != 2) {
140            displayUsage(cmd);
141            return exitCode;
142          }
143          jobid = argv[1];
144          killJob = true;
145        } else if ("-set-priority".equals(cmd)) {
146          if (argv.length != 3) {
147            displayUsage(cmd);
148            return exitCode;
149          }
150          jobid = argv[1];
151          try {
152            jp = JobPriority.valueOf(argv[2]); 
153          } catch (IllegalArgumentException iae) {
154            LOG.info(iae);
155            displayUsage(cmd);
156            return exitCode;
157          }
158          setJobPriority = true; 
159        } else if ("-events".equals(cmd)) {
160          if (argv.length != 4) {
161            displayUsage(cmd);
162            return exitCode;
163          }
164          jobid = argv[1];
165          fromEvent = Integer.parseInt(argv[2]);
166          nEvents = Integer.parseInt(argv[3]);
167          listEvents = true;
168        } else if ("-history".equals(cmd)) {
169          if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
170             displayUsage(cmd);
171             return exitCode;
172          }
173          viewHistory = true;
174          if (argv.length == 3 && "all".equals(argv[1])) {
175            viewAllHistory = true;
176            historyFile = argv[2];
177          } else {
178            historyFile = argv[1];
179          }
180        } else if ("-list".equals(cmd)) {
181          if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
182            displayUsage(cmd);
183            return exitCode;
184          }
185          if (argv.length == 2 && "all".equals(argv[1])) {
186            listAllJobs = true;
187          } else {
188            listJobs = true;
189          }
190        } else if("-kill-task".equals(cmd)) {
191          if (argv.length != 2) {
192            displayUsage(cmd);
193            return exitCode;
194          }
195          killTask = true;
196          taskid = argv[1];
197        } else if("-fail-task".equals(cmd)) {
198          if (argv.length != 2) {
199            displayUsage(cmd);
200            return exitCode;
201          }
202          failTask = true;
203          taskid = argv[1];
204        } else if ("-list-active-trackers".equals(cmd)) {
205          if (argv.length != 1) {
206            displayUsage(cmd);
207            return exitCode;
208          }
209          listActiveTrackers = true;
210        } else if ("-list-blacklisted-trackers".equals(cmd)) {
211          if (argv.length != 1) {
212            displayUsage(cmd);
213            return exitCode;
214          }
215          listBlacklistedTrackers = true;
216        } else if ("-list-attempt-ids".equals(cmd)) {
217          if (argv.length != 4) {
218            displayUsage(cmd);
219            return exitCode;
220          }
221          jobid = argv[1];
222          taskType = argv[2];
223          taskState = argv[3];
224          displayTasks = true;
225          if (!taskTypes.contains(taskType.toUpperCase())) {
226            System.out.println("Error: Invalid task-type: " + taskType);
227            displayUsage(cmd);
228            return exitCode;
229          }
230          if (!taskStates.contains(taskState.toLowerCase())) {
231            System.out.println("Error: Invalid task-state: " + taskState);
232            displayUsage(cmd);
233            return exitCode;
234          }
235        } else if ("-logs".equals(cmd)) {
236          if (argv.length == 2 || argv.length ==3) {
237            logs = true;
238            jobid = argv[1];
239            if (argv.length == 3) {
240              taskid = argv[2];
241            }  else {
242              taskid = null;
243            }
244          } else {
245            displayUsage(cmd);
246            return exitCode;
247          }
248        } else {
249          displayUsage(cmd);
250          return exitCode;
251        }
252    
253        // initialize cluster
254        cluster = createCluster();
255            
256        // Submit the request
257        try {
258          if (submitJobFile != null) {
259            Job job = Job.getInstance(new JobConf(submitJobFile));
260            job.submit();
261            System.out.println("Created job " + job.getJobID());
262            exitCode = 0;
263          } else if (getStatus) {
264            Job job = cluster.getJob(JobID.forName(jobid));
265            if (job == null) {
266              System.out.println("Could not find job " + jobid);
267            } else {
268              Counters counters = job.getCounters();
269              System.out.println();
270              System.out.println(job);
271              if (counters != null) {
272                System.out.println(counters);
273              } else {
274                System.out.println("Counters not available. Job is retired.");
275              }
276              exitCode = 0;
277            }
278          } else if (getCounter) {
279            Job job = cluster.getJob(JobID.forName(jobid));
280            if (job == null) {
281              System.out.println("Could not find job " + jobid);
282            } else {
283              Counters counters = job.getCounters();
284              if (counters == null) {
285                System.out.println("Counters not available for retired job " + 
286                jobid);
287                exitCode = -1;
288              } else {
289                System.out.println(getCounter(counters,
290                  counterGroupName, counterName));
291                exitCode = 0;
292              }
293            }
294          } else if (killJob) {
295            Job job = cluster.getJob(JobID.forName(jobid));
296            if (job == null) {
297              System.out.println("Could not find job " + jobid);
298            } else {
299              job.killJob();
300              System.out.println("Killed job " + jobid);
301              exitCode = 0;
302            }
303          } else if (setJobPriority) {
304            Job job = cluster.getJob(JobID.forName(jobid));
305            if (job == null) {
306              System.out.println("Could not find job " + jobid);
307            } else {
308              job.setPriority(jp);
309              System.out.println("Changed job priority.");
310              exitCode = 0;
311            } 
312          } else if (viewHistory) {
313            viewHistory(historyFile, viewAllHistory);
314            exitCode = 0;
315          } else if (listEvents) {
316            listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents);
317            exitCode = 0;
318          } else if (listJobs) {
319            listJobs(cluster);
320            exitCode = 0;
321          } else if (listAllJobs) {
322            listAllJobs(cluster);
323            exitCode = 0;
324          } else if (listActiveTrackers) {
325            listActiveTrackers(cluster);
326            exitCode = 0;
327          } else if (listBlacklistedTrackers) {
328            listBlacklistedTrackers(cluster);
329            exitCode = 0;
330          } else if (displayTasks) {
331            displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
332            exitCode = 0;
333          } else if(killTask) {
334            TaskAttemptID taskID = TaskAttemptID.forName(taskid);
335            Job job = cluster.getJob(taskID.getJobID());
336            if (job == null) {
337              System.out.println("Could not find job " + jobid);
338            } else if (job.killTask(taskID, false)) {
339              System.out.println("Killed task " + taskid);
340              exitCode = 0;
341            } else {
342              System.out.println("Could not kill task " + taskid);
343              exitCode = -1;
344            }
345          } else if(failTask) {
346            TaskAttemptID taskID = TaskAttemptID.forName(taskid);
347            Job job = cluster.getJob(taskID.getJobID());
348            if (job == null) {
349                System.out.println("Could not find job " + jobid);
350            } else if(job.killTask(taskID, true)) {
351              System.out.println("Killed task " + taskID + " by failing it");
352              exitCode = 0;
353            } else {
354              System.out.println("Could not fail task " + taskid);
355              exitCode = -1;
356            }
357          } else if (logs) {
358            try {
359            JobID jobID = JobID.forName(jobid);
360            TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
361            LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
362            LogCLIHelpers logDumper = new LogCLIHelpers();
363            logDumper.setConf(getConf());
364            exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(),
365                logParams.getContainerId(), logParams.getNodeId(),
366                logParams.getOwner());
367            } catch (IOException e) {
368              if (e instanceof RemoteException) {
369                throw e;
370              } 
371              System.out.println(e.getMessage());
372            }
373          }
374        } catch (RemoteException re) {
375          IOException unwrappedException = re.unwrapRemoteException();
376          if (unwrappedException instanceof AccessControlException) {
377            System.out.println(unwrappedException.getMessage());
378          } else {
379            throw re;
380          }
381        } finally {
382          cluster.close();
383        }
384        return exitCode;
385      }
386    
387      Cluster createCluster() throws IOException {
388        return new Cluster(getConf());
389      }
390    
391      private String getJobPriorityNames() {
392        StringBuffer sb = new StringBuffer();
393        for (JobPriority p : JobPriority.values()) {
394          sb.append(p.name()).append(" ");
395        }
396        return sb.substring(0, sb.length()-1);
397      }
398    
399      private String getTaskTypes() {
400        return StringUtils.join(taskTypes, " ");
401      }
402    
403      /**
404       * Display usage of the command-line tool and terminate execution.
405       */
406      private void displayUsage(String cmd) {
407        String prefix = "Usage: CLI ";
408        String jobPriorityValues = getJobPriorityNames();
409        String taskStates = "running, completed";
410    
411        if ("-submit".equals(cmd)) {
412          System.err.println(prefix + "[" + cmd + " <job-file>]");
413        } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
414          System.err.println(prefix + "[" + cmd + " <job-id>]");
415        } else if ("-counter".equals(cmd)) {
416          System.err.println(prefix + "[" + cmd + 
417            " <job-id> <group-name> <counter-name>]");
418        } else if ("-events".equals(cmd)) {
419          System.err.println(prefix + "[" + cmd + 
420            " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
421        } else if ("-history".equals(cmd)) {
422          System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]");
423        } else if ("-list".equals(cmd)) {
424          System.err.println(prefix + "[" + cmd + " [all]]");
425        } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
426          System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
427        } else if ("-set-priority".equals(cmd)) {
428          System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
429              "Valid values for priorities are: " 
430              + jobPriorityValues); 
431        } else if ("-list-active-trackers".equals(cmd)) {
432          System.err.println(prefix + "[" + cmd + "]");
433        } else if ("-list-blacklisted-trackers".equals(cmd)) {
434          System.err.println(prefix + "[" + cmd + "]");
435        } else if ("-list-attempt-ids".equals(cmd)) {
436          System.err.println(prefix + "[" + cmd + 
437              " <job-id> <task-type> <task-state>]. " +
438              "Valid values for <task-type> are " + getTaskTypes() + ". " +
439              "Valid values for <task-state> are " + taskStates);
440        } else if ("-logs".equals(cmd)) {
441          System.err.println(prefix + "[" + cmd +
442              " <job-id> <task-attempt-id>]. " +
443              " <task-attempt-id> is optional to get task attempt logs.");      
444        } else {
445          System.err.printf(prefix + "<command> <args>%n");
446          System.err.printf("\t[-submit <job-file>]%n");
447          System.err.printf("\t[-status <job-id>]%n");
448          System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n");
449          System.err.printf("\t[-kill <job-id>]%n");
450          System.err.printf("\t[-set-priority <job-id> <priority>]. " +
451            "Valid values for priorities are: " + jobPriorityValues + "%n");
452          System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
453          System.err.printf("\t[-history <jobHistoryFile>]%n");
454          System.err.printf("\t[-list [all]]%n");
455          System.err.printf("\t[-list-active-trackers]%n");
456          System.err.printf("\t[-list-blacklisted-trackers]%n");
457          System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
458            "<task-state>]. " +
459            "Valid values for <task-type> are " + getTaskTypes() + ". " +
460            "Valid values for <task-state> are " + taskStates);
461          System.err.printf("\t[-kill-task <task-attempt-id>]%n");
462          System.err.printf("\t[-fail-task <task-attempt-id>]%n");
463          System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n%n");
464          ToolRunner.printGenericCommandUsage(System.out);
465        }
466      }
467        
468      private void viewHistory(String historyFile, boolean all) 
469        throws IOException {
470        HistoryViewer historyViewer = new HistoryViewer(historyFile,
471                                            getConf(), all);
472        historyViewer.print();
473      }
474    
475      protected long getCounter(Counters counters, String counterGroupName,
476          String counterName) throws IOException {
477        return counters.findCounter(counterGroupName, counterName).getValue();
478      }
479      
480      /**
481       * List the events for the given job
482       * @param jobId the job id for the job's events to list
483       * @throws IOException
484       */
485      private void listEvents(Job job, int fromEventId, int numEvents)
486          throws IOException, InterruptedException {
487        TaskCompletionEvent[] events = job.
488          getTaskCompletionEvents(fromEventId, numEvents);
489        System.out.println("Task completion events for " + job.getJobID());
490        System.out.println("Number of events (from " + fromEventId + ") are: " 
491          + events.length);
492        for(TaskCompletionEvent event: events) {
493          System.out.println(event.getStatus() + " " + 
494            event.getTaskAttemptId() + " " + 
495            getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
496        }
497      }
498    
499      protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
500        return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
501      }
502      
503    
504      /**
505       * Dump a list of currently running jobs
506       * @throws IOException
507       */
508      private void listJobs(Cluster cluster) 
509          throws IOException, InterruptedException {
510        List<JobStatus> runningJobs = new ArrayList<JobStatus>();
511        for (JobStatus job : cluster.getAllJobStatuses()) {
512          if (!job.isJobComplete()) {
513            runningJobs.add(job);
514          }
515        }
516        displayJobList(runningJobs.toArray(new JobStatus[0]));
517      }
518        
519      /**
520       * Dump a list of all jobs submitted.
521       * @throws IOException
522       */
523      private void listAllJobs(Cluster cluster) 
524          throws IOException, InterruptedException {
525        displayJobList(cluster.getAllJobStatuses());
526      }
527      
528      /**
529       * Display the list of active trackers
530       */
531      private void listActiveTrackers(Cluster cluster) 
532          throws IOException, InterruptedException {
533        TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
534        for (TaskTrackerInfo tracker : trackers) {
535          System.out.println(tracker.getTaskTrackerName());
536        }
537      }
538    
539      /**
540       * Display the list of blacklisted trackers
541       */
542      private void listBlacklistedTrackers(Cluster cluster) 
543          throws IOException, InterruptedException {
544        TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
545        if (trackers.length > 0) {
546          System.out.println("BlackListedNode \t Reason");
547        }
548        for (TaskTrackerInfo tracker : trackers) {
549          System.out.println(tracker.getTaskTrackerName() + "\t" + 
550            tracker.getReasonForBlacklist());
551        }
552      }
553    
554      private void printTaskAttempts(TaskReport report) {
555        if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
556          System.out.println(report.getSuccessfulTaskAttemptId());
557        } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
558          for (TaskAttemptID t : 
559            report.getRunningTaskAttemptIds()) {
560            System.out.println(t);
561          }
562        }
563      }
564    
565      /**
566       * Display the information about a job's tasks, of a particular type and
567       * in a particular state
568       * 
569       * @param job the job
570       * @param type the type of the task (map/reduce/setup/cleanup)
571       * @param state the state of the task 
572       * (pending/running/completed/failed/killed)
573       */
574      protected void displayTasks(Job job, String type, String state) 
575      throws IOException, InterruptedException {
576        TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type.toUpperCase()));
577        for (TaskReport report : reports) {
578          TIPStatus status = report.getCurrentStatus();
579          if ((state.equalsIgnoreCase("pending") && status ==TIPStatus.PENDING) ||
580              (state.equalsIgnoreCase("running") && status ==TIPStatus.RUNNING) ||
581              (state.equalsIgnoreCase("completed") && status == TIPStatus.COMPLETE) ||
582              (state.equalsIgnoreCase("failed") && status == TIPStatus.FAILED) ||
583              (state.equalsIgnoreCase("killed") && status == TIPStatus.KILLED)) {
584            printTaskAttempts(report);
585          }
586        }
587      }
588    
589      public void displayJobList(JobStatus[] jobs) 
590          throws IOException, InterruptedException {
591        displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out,
592            Charsets.UTF_8)));
593      }
594    
595      @Private
596      public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
597      @Private
598      public static String dataPattern   = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
599      private static String memPattern   = "%dM";
600      private static String UNAVAILABLE  = "N/A";
601    
602      @Private
603      public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
604        writer.println("Total jobs:" + jobs.length);
605        writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
606          "Queue", "Priority", "UsedContainers",
607          "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
608        for (JobStatus job : jobs) {
609          int numUsedSlots = job.getNumUsedSlots();
610          int numReservedSlots = job.getNumReservedSlots();
611          int usedMem = job.getUsedMem();
612          int rsvdMem = job.getReservedMem();
613          int neededMem = job.getNeededMem();
614          writer.printf(dataPattern,
615              job.getJobID().toString(), job.getState(), job.getStartTime(),
616              job.getUsername(), job.getQueue(), 
617              job.getPriority().name(),
618              numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
619              numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
620              usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
621              rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
622              neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
623              job.getSchedulingInfo());
624        }
625        writer.flush();
626      }
627      
628      public static void main(String[] argv) throws Exception {
629        int res = ToolRunner.run(new CLI(), argv);
630        ExitUtil.terminate(res);
631      }
632    }