001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.yarn.applications.distributedshell;
020    
021    import java.io.IOException;
022    import java.nio.ByteBuffer;
023    import java.util.ArrayList;
024    import java.util.HashMap;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Vector;
028    
029    import org.apache.commons.cli.CommandLine;
030    import org.apache.commons.cli.GnuParser;
031    import org.apache.commons.cli.HelpFormatter;
032    import org.apache.commons.cli.Option;
033    import org.apache.commons.cli.Options;
034    import org.apache.commons.cli.ParseException;
035    import org.apache.commons.io.IOUtils;
036    import org.apache.commons.lang.StringUtils;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    import org.apache.hadoop.classification.InterfaceAudience;
040    import org.apache.hadoop.classification.InterfaceStability;
041    import org.apache.hadoop.conf.Configuration;
042    import org.apache.hadoop.fs.FSDataOutputStream;
043    import org.apache.hadoop.fs.FileStatus;
044    import org.apache.hadoop.fs.FileSystem;
045    import org.apache.hadoop.fs.Path;
046    import org.apache.hadoop.fs.permission.FsPermission;
047    import org.apache.hadoop.io.DataOutputBuffer;
048    import org.apache.hadoop.security.Credentials;
049    import org.apache.hadoop.security.UserGroupInformation;
050    import org.apache.hadoop.security.token.Token;
051    import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
052    import org.apache.hadoop.yarn.api.ApplicationConstants;
053    import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
054    import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
055    import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
056    import org.apache.hadoop.yarn.api.records.ApplicationId;
057    import org.apache.hadoop.yarn.api.records.ApplicationReport;
058    import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
059    import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
060    import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
061    import org.apache.hadoop.yarn.api.records.LocalResource;
062    import org.apache.hadoop.yarn.api.records.LocalResourceType;
063    import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
064    import org.apache.hadoop.yarn.api.records.NodeReport;
065    import org.apache.hadoop.yarn.api.records.NodeState;
066    import org.apache.hadoop.yarn.api.records.Priority;
067    import org.apache.hadoop.yarn.api.records.QueueACL;
068    import org.apache.hadoop.yarn.api.records.QueueInfo;
069    import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
070    import org.apache.hadoop.yarn.api.records.Resource;
071    import org.apache.hadoop.yarn.api.records.YarnApplicationState;
072    import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
073    import org.apache.hadoop.yarn.client.api.YarnClient;
074    import org.apache.hadoop.yarn.client.api.YarnClientApplication;
075    import org.apache.hadoop.yarn.conf.YarnConfiguration;
076    import org.apache.hadoop.yarn.exceptions.YarnException;
077    import org.apache.hadoop.yarn.util.ConverterUtils;
078    import org.apache.hadoop.yarn.util.Records;
079    
080    /**
081     * Client for Distributed Shell application submission to YARN.
082     * 
083     * <p> The distributed shell client allows an application master to be launched that in turn would run 
084     * the provided shell command on a set of containers. </p>
085     * 
086     * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
087     * 
088     * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 
089     * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} 
090     * provides a way for the client to get access to cluster information and to request for a
091     * new {@link ApplicationId}. <p>
092     * 
093     * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 
094     * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 
095     * and application name, the priority assigned to the application and the queue
096     * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
097     * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 
098     * the {@link ApplicationMaster} is launched. </p>
099     * 
100     * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 
101     * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 
102     * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 
103     * {@link ApplicationMaster}. <p>
104     * 
105     * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 
106     * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 
107     * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 
108     * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
109     *
110     */
111    @InterfaceAudience.Public
112    @InterfaceStability.Unstable
113    public class Client {
114    
115      private static final Log LOG = LogFactory.getLog(Client.class);
116    
117      // Configuration
118      private Configuration conf;
119      private YarnClient yarnClient;
120      // Application master specific info to register a new Application with RM/ASM
121      private String appName = "";
122      // App master priority
123      private int amPriority = 0;
124      // Queue for App master
125      private String amQueue = "";
126      // Amt. of memory resource to request for to run the App Master
127      private int amMemory = 10; 
128      // Amt. of virtual core resource to request for to run the App Master
129      private int amVCores = 1;
130    
131      // Application master jar file
132      private String appMasterJar = ""; 
133      // Main class to invoke application master
134      private final String appMasterMainClass;
135    
136      // Shell command to be executed 
137      private String shellCommand = ""; 
138      // Location of shell script 
139      private String shellScriptPath = ""; 
140      // Args to be passed to the shell command
141      private String[] shellArgs = new String[] {};
142      // Env variables to be setup for the shell command 
143      private Map<String, String> shellEnv = new HashMap<String, String>();
144      // Shell Command Container priority 
145      private int shellCmdPriority = 0;
146    
147      // Amt of memory to request for container in which shell script will be executed
148      private int containerMemory = 10; 
149      // Amt. of virtual cores to request for container in which shell script will be executed
150      private int containerVirtualCores = 1;
151      // No. of containers in which the shell script needs to be executed
152      private int numContainers = 1;
153    
154      // log4j.properties file 
155      // if available, add to local resources and set into classpath 
156      private String log4jPropFile = "";    
157    
158      // Start time for client
159      private final long clientStartTime = System.currentTimeMillis();
160      // Timeout threshold for client. Kill app after time interval expires.
161      private long clientTimeout = 600000;
162    
163      // flag to indicate whether to keep containers across application attempts.
164      private boolean keepContainers = false;
165    
166      // Debug flag
167      boolean debugFlag = false;    
168    
169      // Command line options
170      private Options opts;
171    
172      private static final String shellCommandPath = "shellCommands";
173      private static final String shellArgsPath = "shellArgs";
174      private static final String appMasterJarPath = "AppMaster.jar";
175      // Hardcoded path to custom log_properties
176      private static final String log4jPath = "log4j.properties";
177    
178      public static final String SCRIPT_PATH = "ExecScript";
179    
180      /**
181       * @param args Command line arguments 
182       */
183      public static void main(String[] args) {
184        boolean result = false;
185        try {
186          Client client = new Client();
187          LOG.info("Initializing Client");
188          try {
189            boolean doRun = client.init(args);
190            if (!doRun) {
191              System.exit(0);
192            }
193          } catch (IllegalArgumentException e) {
194            System.err.println(e.getLocalizedMessage());
195            client.printUsage();
196            System.exit(-1);
197          }
198          result = client.run();
199        } catch (Throwable t) {
200          LOG.fatal("Error running CLient", t);
201          System.exit(1);
202        }
203        if (result) {
204          LOG.info("Application completed successfully");
205          System.exit(0);                   
206        } 
207        LOG.error("Application failed to complete successfully");
208        System.exit(2);
209      }
210    
211      /**
212       */
213      public Client(Configuration conf) throws Exception  {
214        this(
215          "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster",
216          conf);
217      }
218    
219      Client(String appMasterMainClass, Configuration conf) {
220        this.conf = conf;
221        this.appMasterMainClass = appMasterMainClass;
222        yarnClient = YarnClient.createYarnClient();
223        yarnClient.init(conf);
224        opts = new Options();
225        opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
226        opts.addOption("priority", true, "Application Priority. Default 0");
227        opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
228        opts.addOption("timeout", true, "Application timeout in milliseconds");
229        opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
230        opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
231        opts.addOption("jar", true, "Jar file containing the application master");
232        opts.addOption("shell_command", true, "Shell command to be executed by " +
233            "the Application Master. Can only specify either --shell_command " +
234            "or --shell_script");
235        opts.addOption("shell_script", true, "Location of the shell script to be " +
236            "executed. Can only specify either --shell_command or --shell_script");
237        opts.addOption("shell_args", true, "Command line args for the shell script." +
238            "Multiple args can be separated by empty space.");
239        opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
240        opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
241        opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
242        opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
243        opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
244        opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
245        opts.addOption("log_properties", true, "log4j.properties file");
246        opts.addOption("keep_containers_across_application_attempts", false,
247          "Flag to indicate whether to keep containers across application attempts." +
248          " If the flag is true, running containers will not be killed when" +
249          " application attempt fails and these containers will be retrieved by" +
250          " the new application attempt ");
251        opts.addOption("debug", false, "Dump out debug information");
252        opts.addOption("help", false, "Print usage");
253    
254      }
255    
256      /**
257       */
258      public Client() throws Exception  {
259        this(new YarnConfiguration());
260      }
261    
262      /**
263       * Helper function to print out usage
264       */
265      private void printUsage() {
266        new HelpFormatter().printHelp("Client", opts);
267      }
268    
269      /**
270       * Parse command line options
271       * @param args Parsed command line options 
272       * @return Whether the init was successful to run the client
273       * @throws ParseException
274       */
275      public boolean init(String[] args) throws ParseException {
276    
277        CommandLine cliParser = new GnuParser().parse(opts, args);
278    
279        if (args.length == 0) {
280          throw new IllegalArgumentException("No args specified for client to initialize");
281        }
282    
283        if (cliParser.hasOption("log_properties")) {
284          String log4jPath = cliParser.getOptionValue("log_properties");
285          try {
286            Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath);
287          } catch (Exception e) {
288            LOG.warn("Can not set up custom log4j properties. " + e);
289          }
290        }
291    
292        if (cliParser.hasOption("help")) {
293          printUsage();
294          return false;
295        }
296    
297        if (cliParser.hasOption("debug")) {
298          debugFlag = true;
299    
300        }
301    
302        if (cliParser.hasOption("keep_containers_across_application_attempts")) {
303          LOG.info("keep_containers_across_application_attempts");
304          keepContainers = true;
305        }
306    
307        appName = cliParser.getOptionValue("appname", "DistributedShell");
308        amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
309        amQueue = cliParser.getOptionValue("queue", "default");
310        amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));               
311        amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
312    
313        if (amMemory < 0) {
314          throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
315              + " Specified memory=" + amMemory);
316        }
317        if (amVCores < 0) {
318          throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
319              + " Specified virtual cores=" + amVCores);
320        }
321    
322        if (!cliParser.hasOption("jar")) {
323          throw new IllegalArgumentException("No jar file specified for application master");
324        }           
325    
326        appMasterJar = cliParser.getOptionValue("jar");
327    
328        if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) {
329          throw new IllegalArgumentException(
330              "No shell command or shell script specified to be executed by application master");
331        } else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) {
332          throw new IllegalArgumentException("Can not specify shell_command option " +
333              "and shell_script option at the same time");
334        } else if (cliParser.hasOption("shell_command")) {
335          shellCommand = cliParser.getOptionValue("shell_command");
336        } else {
337          shellScriptPath = cliParser.getOptionValue("shell_script");
338        }
339        if (cliParser.hasOption("shell_args")) {
340          shellArgs = cliParser.getOptionValues("shell_args");
341        }
342        if (cliParser.hasOption("shell_env")) { 
343          String envs[] = cliParser.getOptionValues("shell_env");
344          for (String env : envs) {
345            env = env.trim();
346            int index = env.indexOf('=');
347            if (index == -1) {
348              shellEnv.put(env, "");
349              continue;
350            }
351            String key = env.substring(0, index);
352            String val = "";
353            if (index < (env.length()-1)) {
354              val = env.substring(index+1);
355            }
356            shellEnv.put(key, val);
357          }
358        }
359        shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
360    
361        containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
362        containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
363        numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
364    
365        if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
366          throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
367              + " exiting."
368              + " Specified containerMemory=" + containerMemory
369              + ", containerVirtualCores=" + containerVirtualCores
370              + ", numContainer=" + numContainers);
371        }
372    
373        clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
374    
375        log4jPropFile = cliParser.getOptionValue("log_properties", "");
376    
377        return true;
378      }
379    
380      /**
381       * Main run function for the client
382       * @return true if application completed successfully
383       * @throws IOException
384       * @throws YarnException
385       */
386      public boolean run() throws IOException, YarnException {
387    
388        LOG.info("Running Client");
389        yarnClient.start();
390    
391        YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
392        LOG.info("Got Cluster metric info from ASM" 
393            + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
394    
395        List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
396            NodeState.RUNNING);
397        LOG.info("Got Cluster node info from ASM");
398        for (NodeReport node : clusterNodeReports) {
399          LOG.info("Got node report from ASM for"
400              + ", nodeId=" + node.getNodeId() 
401              + ", nodeAddress" + node.getHttpAddress()
402              + ", nodeRackName" + node.getRackName()
403              + ", nodeNumContainers" + node.getNumContainers());
404        }
405    
406        QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
407        LOG.info("Queue info"
408            + ", queueName=" + queueInfo.getQueueName()
409            + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
410            + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
411            + ", queueApplicationCount=" + queueInfo.getApplications().size()
412            + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());               
413    
414        List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
415        for (QueueUserACLInfo aclInfo : listAclInfo) {
416          for (QueueACL userAcl : aclInfo.getUserAcls()) {
417            LOG.info("User ACL Info for Queue"
418                + ", queueName=" + aclInfo.getQueueName()                   
419                + ", userAcl=" + userAcl.name());
420          }
421        }           
422    
423        // Get a new application id
424        YarnClientApplication app = yarnClient.createApplication();
425        GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
426        // TODO get min/max resource capabilities from RM and change memory ask if needed
427        // If we do not have min/max, we may not be able to correctly request 
428        // the required resources from the RM for the app master
429        // Memory ask has to be a multiple of min and less than max. 
430        // Dump out information about cluster capability as seen by the resource manager
431        int maxMem = appResponse.getMaximumResourceCapability().getMemory();
432        LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
433    
434        // A resource ask cannot exceed the max. 
435        if (amMemory > maxMem) {
436          LOG.info("AM memory specified above max threshold of cluster. Using max value."
437              + ", specified=" + amMemory
438              + ", max=" + maxMem);
439          amMemory = maxMem;
440        }                           
441    
442        int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
443        LOG.info("Max virtual cores capabililty of resources in this cluster " + maxVCores);
444        
445        if (amVCores > maxVCores) {
446          LOG.info("AM virtual cores specified above max threshold of cluster. " 
447              + "Using max value." + ", specified=" + amVCores 
448              + ", max=" + maxVCores);
449          amVCores = maxVCores;
450        }
451        
452        // set the application name
453        ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
454        ApplicationId appId = appContext.getApplicationId();
455    
456        appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
457        appContext.setApplicationName(appName);
458    
459        // Set up the container launch context for the application master
460        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
461    
462        // set local resources for the application master
463        // local files or archives as needed
464        // In this scenario, the jar file for the application master is part of the local resources                 
465        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
466    
467        LOG.info("Copy App Master jar from local filesystem and add to local environment");
468        // Copy the application master jar to the filesystem 
469        // Create a local resource to point to the destination jar path 
470        FileSystem fs = FileSystem.get(conf);
471        addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
472            localResources, null);
473    
474        // Set the log4j properties if needed 
475        if (!log4jPropFile.isEmpty()) {
476          addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
477              localResources, null);
478        }                   
479    
480        // The shell script has to be made available on the final container(s)
481        // where it will be executed. 
482        // To do this, we need to first copy into the filesystem that is visible 
483        // to the yarn framework. 
484        // We do not need to set this as a local resource for the application 
485        // master as the application master does not need it.               
486        String hdfsShellScriptLocation = ""; 
487        long hdfsShellScriptLen = 0;
488        long hdfsShellScriptTimestamp = 0;
489        if (!shellScriptPath.isEmpty()) {
490          Path shellSrc = new Path(shellScriptPath);
491          String shellPathSuffix =
492              appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
493          Path shellDst =
494              new Path(fs.getHomeDirectory(), shellPathSuffix);
495          fs.copyFromLocalFile(false, true, shellSrc, shellDst);
496          hdfsShellScriptLocation = shellDst.toUri().toString(); 
497          FileStatus shellFileStatus = fs.getFileStatus(shellDst);
498          hdfsShellScriptLen = shellFileStatus.getLen();
499          hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
500        }
501    
502        if (!shellCommand.isEmpty()) {
503          addToLocalResources(fs, null, shellCommandPath, appId.toString(),
504              localResources, shellCommand);
505        }
506    
507        if (shellArgs.length > 0) {
508          addToLocalResources(fs, null, shellArgsPath, appId.toString(),
509              localResources, StringUtils.join(shellArgs, " "));
510        }
511        // Set local resource info into app master container launch context
512        amContainer.setLocalResources(localResources);
513    
514        // Set the necessary security tokens as needed
515        //amContainer.setContainerTokens(containerToken);
516    
517        // Set the env variables to be setup in the env where the application master will be run
518        LOG.info("Set the environment for the application master");
519        Map<String, String> env = new HashMap<String, String>();
520    
521        // put location of shell script into env
522        // using the env info, the application master will create the correct local resource for the 
523        // eventual containers that will be launched to execute the shell scripts
524        env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
525        env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
526        env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
527    
528        // Add AppMaster.jar location to classpath          
529        // At some point we should not be required to add 
530        // the hadoop specific classpaths to the env. 
531        // It should be provided out of the box. 
532        // For now setting all required classpaths including
533        // the classpath to "." for the application jar
534        StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
535          .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
536        for (String c : conf.getStrings(
537            YarnConfiguration.YARN_APPLICATION_CLASSPATH,
538            YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
539          classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
540          classPathEnv.append(c.trim());
541        }
542        classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
543          "./log4j.properties");
544    
545        // add the runtime classpath needed for tests to work
546        if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
547          classPathEnv.append(':');
548          classPathEnv.append(System.getProperty("java.class.path"));
549        }
550    
551        env.put("CLASSPATH", classPathEnv.toString());
552    
553        amContainer.setEnvironment(env);
554    
555        // Set the necessary command to execute the application master 
556        Vector<CharSequence> vargs = new Vector<CharSequence>(30);
557    
558        // Set java executable command 
559        LOG.info("Setting up app master command");
560        vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
561        // Set Xmx based on am memory size
562        vargs.add("-Xmx" + amMemory + "m");
563        // Set class name 
564        vargs.add(appMasterMainClass);
565        // Set params for Application Master
566        vargs.add("--container_memory " + String.valueOf(containerMemory));
567        vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
568        vargs.add("--num_containers " + String.valueOf(numContainers));
569        vargs.add("--priority " + String.valueOf(shellCmdPriority));
570    
571        for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
572          vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
573        }                   
574        if (debugFlag) {
575          vargs.add("--debug");
576        }
577    
578        vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
579        vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
580    
581        // Get final commmand
582        StringBuilder command = new StringBuilder();
583        for (CharSequence str : vargs) {
584          command.append(str).append(" ");
585        }
586    
587        LOG.info("Completed setting up app master command " + command.toString());     
588        List<String> commands = new ArrayList<String>();
589        commands.add(command.toString());           
590        amContainer.setCommands(commands);
591    
592        // Set up resource type requirements
593        // For now, both memory and vcores are supported, so we set memory and 
594        // vcores requirements
595        Resource capability = Records.newRecord(Resource.class);
596        capability.setMemory(amMemory);
597        capability.setVirtualCores(amVCores);
598        appContext.setResource(capability);
599    
600        // Service data is a binary blob that can be passed to the application
601        // Not needed in this scenario
602        // amContainer.setServiceData(serviceData);
603    
604        // Setup security tokens
605        if (UserGroupInformation.isSecurityEnabled()) {
606          Credentials credentials = new Credentials();
607          String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
608          if (tokenRenewer == null || tokenRenewer.length() == 0) {
609            throw new IOException(
610              "Can't get Master Kerberos principal for the RM to use as renewer");
611          }
612    
613          // For now, only getting tokens for the default file-system.
614          final Token<?> tokens[] =
615              fs.addDelegationTokens(tokenRenewer, credentials);
616          if (tokens != null) {
617            for (Token<?> token : tokens) {
618              LOG.info("Got dt for " + fs.getUri() + "; " + token);
619            }
620          }
621          DataOutputBuffer dob = new DataOutputBuffer();
622          credentials.writeTokenStorageToStream(dob);
623          ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
624          amContainer.setTokens(fsTokens);
625        }
626    
627        appContext.setAMContainerSpec(amContainer);
628    
629        // Set the priority for the application master
630        Priority pri = Records.newRecord(Priority.class);
631        // TODO - what is the range for priority? how to decide? 
632        pri.setPriority(amPriority);
633        appContext.setPriority(pri);
634    
635        // Set the queue to which this application is to be submitted in the RM
636        appContext.setQueue(amQueue);
637    
638        // Submit the application to the applications manager
639        // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
640        // Ignore the response as either a valid response object is returned on success 
641        // or an exception thrown to denote some form of a failure
642        LOG.info("Submitting application to ASM");
643    
644        yarnClient.submitApplication(appContext);
645    
646        // TODO
647        // Try submitting the same request again
648        // app submission failure?
649    
650        // Monitor the application
651        return monitorApplication(appId);
652    
653      }
654    
655      /**
656       * Monitor the submitted application for completion. 
657       * Kill application if time expires. 
658       * @param appId Application Id of application to be monitored
659       * @return true if application completed successfully
660       * @throws YarnException
661       * @throws IOException
662       */
663      private boolean monitorApplication(ApplicationId appId)
664          throws YarnException, IOException {
665    
666        while (true) {
667    
668          // Check app status every 1 second.
669          try {
670            Thread.sleep(1000);
671          } catch (InterruptedException e) {
672            LOG.debug("Thread sleep in monitoring loop interrupted");
673          }
674    
675          // Get application report for the appId we are interested in 
676          ApplicationReport report = yarnClient.getApplicationReport(appId);
677    
678          LOG.info("Got application report from ASM for"
679              + ", appId=" + appId.getId()
680              + ", clientToAMToken=" + report.getClientToAMToken()
681              + ", appDiagnostics=" + report.getDiagnostics()
682              + ", appMasterHost=" + report.getHost()
683              + ", appQueue=" + report.getQueue()
684              + ", appMasterRpcPort=" + report.getRpcPort()
685              + ", appStartTime=" + report.getStartTime()
686              + ", yarnAppState=" + report.getYarnApplicationState().toString()
687              + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
688              + ", appTrackingUrl=" + report.getTrackingUrl()
689              + ", appUser=" + report.getUser());
690    
691          YarnApplicationState state = report.getYarnApplicationState();
692          FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
693          if (YarnApplicationState.FINISHED == state) {
694            if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
695              LOG.info("Application has completed successfully. Breaking monitoring loop");
696              return true;        
697            }
698            else {
699              LOG.info("Application did finished unsuccessfully."
700                  + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
701                  + ". Breaking monitoring loop");
702              return false;
703            }                         
704          }
705          else if (YarnApplicationState.KILLED == state     
706              || YarnApplicationState.FAILED == state) {
707            LOG.info("Application did not finish."
708                + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
709                + ". Breaking monitoring loop");
710            return false;
711          }                 
712    
713          if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
714            LOG.info("Reached client specified timeout for application. Killing application");
715            forceKillApplication(appId);
716            return false;                           
717          }
718        }                   
719    
720      }
721    
722      /**
723       * Kill a submitted application by sending a call to the ASM
724       * @param appId Application Id to be killed. 
725       * @throws YarnException
726       * @throws IOException
727       */
728      private void forceKillApplication(ApplicationId appId)
729          throws YarnException, IOException {
730        // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 
731        // the same time. 
732        // If yes, can we kill a particular attempt only?
733    
734        // Response can be ignored as it is non-null on success or 
735        // throws an exception in case of failures
736        yarnClient.killApplication(appId);  
737      }
738    
739      private void addToLocalResources(FileSystem fs, String fileSrcPath,
740          String fileDstPath, String appId, Map<String, LocalResource> localResources,
741          String resources) throws IOException {
742        String suffix =
743            appName + "/" + appId + "/" + fileDstPath;
744        Path dst =
745            new Path(fs.getHomeDirectory(), suffix);
746        if (fileSrcPath == null) {
747          FSDataOutputStream ostream = null;
748          try {
749            ostream = FileSystem
750                .create(fs, dst, new FsPermission((short) 0710));
751            ostream.writeUTF(resources);
752          } finally {
753            IOUtils.closeQuietly(ostream);
754          }
755        } else {
756          fs.copyFromLocalFile(new Path(fileSrcPath), dst);
757        }
758        FileStatus scFileStatus = fs.getFileStatus(dst);
759        LocalResource scRsrc =
760            LocalResource.newInstance(
761                ConverterUtils.getYarnUrlFromURI(dst.toUri()),
762                LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
763                scFileStatus.getLen(), scFileStatus.getModificationTime());
764        localResources.put(fileDstPath, scRsrc);
765      }
766    }