001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.yarn.applications.distributedshell;
020    
021    import java.io.IOException;
022    import java.nio.ByteBuffer;
023    import java.util.ArrayList;
024    import java.util.HashMap;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Vector;
028    
029    import org.apache.commons.cli.CommandLine;
030    import org.apache.commons.cli.GnuParser;
031    import org.apache.commons.cli.HelpFormatter;
032    import org.apache.commons.cli.Option;
033    import org.apache.commons.cli.Options;
034    import org.apache.commons.cli.ParseException;
035    import org.apache.commons.io.IOUtils;
036    import org.apache.commons.lang.StringUtils;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    import org.apache.hadoop.classification.InterfaceAudience;
040    import org.apache.hadoop.classification.InterfaceStability;
041    import org.apache.hadoop.conf.Configuration;
042    import org.apache.hadoop.fs.FSDataOutputStream;
043    import org.apache.hadoop.fs.FileStatus;
044    import org.apache.hadoop.fs.FileSystem;
045    import org.apache.hadoop.fs.Path;
046    import org.apache.hadoop.fs.permission.FsPermission;
047    import org.apache.hadoop.io.DataOutputBuffer;
048    import org.apache.hadoop.security.Credentials;
049    import org.apache.hadoop.security.UserGroupInformation;
050    import org.apache.hadoop.security.token.Token;
051    import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
052    import org.apache.hadoop.yarn.api.ApplicationConstants;
053    import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
054    import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
055    import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
056    import org.apache.hadoop.yarn.api.records.ApplicationId;
057    import org.apache.hadoop.yarn.api.records.ApplicationReport;
058    import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
059    import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
060    import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
061    import org.apache.hadoop.yarn.api.records.LocalResource;
062    import org.apache.hadoop.yarn.api.records.LocalResourceType;
063    import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
064    import org.apache.hadoop.yarn.api.records.NodeReport;
065    import org.apache.hadoop.yarn.api.records.NodeState;
066    import org.apache.hadoop.yarn.api.records.Priority;
067    import org.apache.hadoop.yarn.api.records.QueueACL;
068    import org.apache.hadoop.yarn.api.records.QueueInfo;
069    import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
070    import org.apache.hadoop.yarn.api.records.Resource;
071    import org.apache.hadoop.yarn.api.records.YarnApplicationState;
072    import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
073    import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
074    import org.apache.hadoop.yarn.client.api.TimelineClient;
075    import org.apache.hadoop.yarn.client.api.YarnClient;
076    import org.apache.hadoop.yarn.client.api.YarnClientApplication;
077    import org.apache.hadoop.yarn.conf.YarnConfiguration;
078    import org.apache.hadoop.yarn.exceptions.YarnException;
079    import org.apache.hadoop.yarn.util.ConverterUtils;
080    import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
081    
082    /**
083     * Client for Distributed Shell application submission to YARN.
084     * 
085     * <p> The distributed shell client allows an application master to be launched that in turn would run 
086     * the provided shell command on a set of containers. </p>
087     * 
088     * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
089     * 
090     * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 
091     * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} 
092     * provides a way for the client to get access to cluster information and to request for a
093     * new {@link ApplicationId}. <p>
094     * 
095     * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 
096     * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 
097     * and application name, the priority assigned to the application and the queue
098     * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
099     * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 
100     * the {@link ApplicationMaster} is launched. </p>
101     * 
102     * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 
103     * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 
104     * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 
105     * {@link ApplicationMaster}. <p>
106     * 
107     * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 
108     * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 
109     * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 
110     * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
111     *
112     */
113    @InterfaceAudience.Public
114    @InterfaceStability.Unstable
115    public class Client {
116    
117      private static final Log LOG = LogFactory.getLog(Client.class);
118      
119      // Configuration
120      private Configuration conf;
121      private YarnClient yarnClient;
122      // Application master specific info to register a new Application with RM/ASM
123      private String appName = "";
124      // App master priority
125      private int amPriority = 0;
126      // Queue for App master
127      private String amQueue = "";
128      // Amt. of memory resource to request for to run the App Master
129      private int amMemory = 10; 
130      // Amt. of virtual core resource to request for to run the App Master
131      private int amVCores = 1;
132    
133      // Application master jar file
134      private String appMasterJar = ""; 
135      // Main class to invoke application master
136      private final String appMasterMainClass;
137    
138      // Shell command to be executed 
139      private String shellCommand = ""; 
140      // Location of shell script 
141      private String shellScriptPath = ""; 
142      // Args to be passed to the shell command
143      private String[] shellArgs = new String[] {};
144      // Env variables to be setup for the shell command 
145      private Map<String, String> shellEnv = new HashMap<String, String>();
146      // Shell Command Container priority 
147      private int shellCmdPriority = 0;
148    
149      // Amt of memory to request for container in which shell script will be executed
150      private int containerMemory = 10; 
151      // Amt. of virtual cores to request for container in which shell script will be executed
152      private int containerVirtualCores = 1;
153      // No. of containers in which the shell script needs to be executed
154      private int numContainers = 1;
155      private String nodeLabelExpression = null;
156    
157      // log4j.properties file 
158      // if available, add to local resources and set into classpath 
159      private String log4jPropFile = "";    
160    
161      // Start time for client
162      private final long clientStartTime = System.currentTimeMillis();
163      // Timeout threshold for client. Kill app after time interval expires.
164      private long clientTimeout = 600000;
165    
166      // flag to indicate whether to keep containers across application attempts.
167      private boolean keepContainers = false;
168    
169      private long attemptFailuresValidityInterval = -1;
170    
171      // Debug flag
172      boolean debugFlag = false;
173    
174      // Timeline domain ID
175      private String domainId = null;
176    
177      // Flag to indicate whether to create the domain of the given ID
178      private boolean toCreateDomain = false;
179    
180      // Timeline domain reader access control
181      private String viewACLs = null;
182    
183      // Timeline domain writer access control
184      private String modifyACLs = null;
185    
186      // Command line options
187      private Options opts;
188    
189      private static final String shellCommandPath = "shellCommands";
190      private static final String shellArgsPath = "shellArgs";
191      private static final String appMasterJarPath = "AppMaster.jar";
192      // Hardcoded path to custom log_properties
193      private static final String log4jPath = "log4j.properties";
194    
195      public static final String SCRIPT_PATH = "ExecScript";
196    
197      /**
198       * @param args Command line arguments 
199       */
200      public static void main(String[] args) {
201        boolean result = false;
202        try {
203          Client client = new Client();
204          LOG.info("Initializing Client");
205          try {
206            boolean doRun = client.init(args);
207            if (!doRun) {
208              System.exit(0);
209            }
210          } catch (IllegalArgumentException e) {
211            System.err.println(e.getLocalizedMessage());
212            client.printUsage();
213            System.exit(-1);
214          }
215          result = client.run();
216        } catch (Throwable t) {
217          LOG.fatal("Error running Client", t);
218          System.exit(1);
219        }
220        if (result) {
221          LOG.info("Application completed successfully");
222          System.exit(0);                   
223        } 
224        LOG.error("Application failed to complete successfully");
225        System.exit(2);
226      }
227    
228      /**
229       */
230      public Client(Configuration conf) throws Exception  {
231        this(
232          "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster",
233          conf);
234      }
235    
236      Client(String appMasterMainClass, Configuration conf) {
237        this.conf = conf;
238        this.appMasterMainClass = appMasterMainClass;
239        yarnClient = YarnClient.createYarnClient();
240        yarnClient.init(conf);
241        opts = new Options();
242        opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
243        opts.addOption("priority", true, "Application Priority. Default 0");
244        opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
245        opts.addOption("timeout", true, "Application timeout in milliseconds");
246        opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
247        opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
248        opts.addOption("jar", true, "Jar file containing the application master");
249        opts.addOption("shell_command", true, "Shell command to be executed by " +
250            "the Application Master. Can only specify either --shell_command " +
251            "or --shell_script");
252        opts.addOption("shell_script", true, "Location of the shell script to be " +
253            "executed. Can only specify either --shell_command or --shell_script");
254        opts.addOption("shell_args", true, "Command line args for the shell script." +
255            "Multiple args can be separated by empty space.");
256        opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
257        opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
258        opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
259        opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
260        opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
261        opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
262        opts.addOption("log_properties", true, "log4j.properties file");
263        opts.addOption("keep_containers_across_application_attempts", false,
264          "Flag to indicate whether to keep containers across application attempts." +
265          " If the flag is true, running containers will not be killed when" +
266          " application attempt fails and these containers will be retrieved by" +
267          " the new application attempt ");
268        opts.addOption("attempt_failures_validity_interval", true,
269          "when attempt_failures_validity_interval in milliseconds is set to > 0," +
270          "the failure number will not take failures which happen out of " +
271          "the validityInterval into failure count. " +
272          "If failure count reaches to maxAppAttempts, " +
273          "the application will be failed.");
274        opts.addOption("debug", false, "Dump out debug information");
275        opts.addOption("domain", true, "ID of the timeline domain where the "
276            + "timeline entities will be put");
277        opts.addOption("view_acls", true, "Users and groups that allowed to "
278            + "view the timeline entities in the given domain");
279        opts.addOption("modify_acls", true, "Users and groups that allowed to "
280            + "modify the timeline entities in the given domain");
281        opts.addOption("create", false, "Flag to indicate whether to create the "
282            + "domain specified with -domain.");
283        opts.addOption("help", false, "Print usage");
284        opts.addOption("node_label_expression", true,
285            "Node label expression to determine the nodes"
286                + " where all the containers of this application"
287                + " will be allocated, \"\" means containers"
288                + " can be allocated anywhere, if you don't specify the option,"
289                + " default node_label_expression of queue will be used.");
290      }
291    
292      /**
293       */
294      public Client() throws Exception  {
295        this(new YarnConfiguration());
296      }
297    
298      /**
299       * Helper function to print out usage
300       */
301      private void printUsage() {
302        new HelpFormatter().printHelp("Client", opts);
303      }
304    
305      /**
306       * Parse command line options
307       * @param args Parsed command line options 
308       * @return Whether the init was successful to run the client
309       * @throws ParseException
310       */
311      public boolean init(String[] args) throws ParseException {
312    
313        CommandLine cliParser = new GnuParser().parse(opts, args);
314    
315        if (args.length == 0) {
316          throw new IllegalArgumentException("No args specified for client to initialize");
317        }
318    
319        if (cliParser.hasOption("log_properties")) {
320          String log4jPath = cliParser.getOptionValue("log_properties");
321          try {
322            Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath);
323          } catch (Exception e) {
324            LOG.warn("Can not set up custom log4j properties. " + e);
325          }
326        }
327    
328        if (cliParser.hasOption("help")) {
329          printUsage();
330          return false;
331        }
332    
333        if (cliParser.hasOption("debug")) {
334          debugFlag = true;
335    
336        }
337    
338        if (cliParser.hasOption("keep_containers_across_application_attempts")) {
339          LOG.info("keep_containers_across_application_attempts");
340          keepContainers = true;
341        }
342    
343        appName = cliParser.getOptionValue("appname", "DistributedShell");
344        amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
345        amQueue = cliParser.getOptionValue("queue", "default");
346        amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));               
347        amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
348    
349        if (amMemory < 0) {
350          throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
351              + " Specified memory=" + amMemory);
352        }
353        if (amVCores < 0) {
354          throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
355              + " Specified virtual cores=" + amVCores);
356        }
357    
358        if (!cliParser.hasOption("jar")) {
359          throw new IllegalArgumentException("No jar file specified for application master");
360        }           
361    
362        appMasterJar = cliParser.getOptionValue("jar");
363    
364        if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) {
365          throw new IllegalArgumentException(
366              "No shell command or shell script specified to be executed by application master");
367        } else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) {
368          throw new IllegalArgumentException("Can not specify shell_command option " +
369              "and shell_script option at the same time");
370        } else if (cliParser.hasOption("shell_command")) {
371          shellCommand = cliParser.getOptionValue("shell_command");
372        } else {
373          shellScriptPath = cliParser.getOptionValue("shell_script");
374        }
375        if (cliParser.hasOption("shell_args")) {
376          shellArgs = cliParser.getOptionValues("shell_args");
377        }
378        if (cliParser.hasOption("shell_env")) { 
379          String envs[] = cliParser.getOptionValues("shell_env");
380          for (String env : envs) {
381            env = env.trim();
382            int index = env.indexOf('=');
383            if (index == -1) {
384              shellEnv.put(env, "");
385              continue;
386            }
387            String key = env.substring(0, index);
388            String val = "";
389            if (index < (env.length()-1)) {
390              val = env.substring(index+1);
391            }
392            shellEnv.put(key, val);
393          }
394        }
395        shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
396    
397        containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
398        containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
399        numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
400        
401    
402        if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
403          throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
404              + " exiting."
405              + " Specified containerMemory=" + containerMemory
406              + ", containerVirtualCores=" + containerVirtualCores
407              + ", numContainer=" + numContainers);
408        }
409        
410        nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null);
411    
412        clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
413    
414        attemptFailuresValidityInterval =
415            Long.parseLong(cliParser.getOptionValue(
416              "attempt_failures_validity_interval", "-1"));
417    
418        log4jPropFile = cliParser.getOptionValue("log_properties", "");
419    
420        // Get timeline domain options
421        if (cliParser.hasOption("domain")) {
422          domainId = cliParser.getOptionValue("domain");
423          toCreateDomain = cliParser.hasOption("create");
424          if (cliParser.hasOption("view_acls")) {
425            viewACLs = cliParser.getOptionValue("view_acls");
426          }
427          if (cliParser.hasOption("modify_acls")) {
428            modifyACLs = cliParser.getOptionValue("modify_acls");
429          }
430        }
431    
432        return true;
433      }
434    
435      /**
436       * Main run function for the client
437       * @return true if application completed successfully
438       * @throws IOException
439       * @throws YarnException
440       */
441      public boolean run() throws IOException, YarnException {
442    
443        LOG.info("Running Client");
444        yarnClient.start();
445    
446        YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
447        LOG.info("Got Cluster metric info from ASM" 
448            + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
449    
450        List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
451            NodeState.RUNNING);
452        LOG.info("Got Cluster node info from ASM");
453        for (NodeReport node : clusterNodeReports) {
454          LOG.info("Got node report from ASM for"
455              + ", nodeId=" + node.getNodeId() 
456              + ", nodeAddress" + node.getHttpAddress()
457              + ", nodeRackName" + node.getRackName()
458              + ", nodeNumContainers" + node.getNumContainers());
459        }
460    
461        QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
462        LOG.info("Queue info"
463            + ", queueName=" + queueInfo.getQueueName()
464            + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
465            + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
466            + ", queueApplicationCount=" + queueInfo.getApplications().size()
467            + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());               
468    
469        List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
470        for (QueueUserACLInfo aclInfo : listAclInfo) {
471          for (QueueACL userAcl : aclInfo.getUserAcls()) {
472            LOG.info("User ACL Info for Queue"
473                + ", queueName=" + aclInfo.getQueueName()                   
474                + ", userAcl=" + userAcl.name());
475          }
476        }           
477    
478        if (domainId != null && domainId.length() > 0 && toCreateDomain) {
479          prepareTimelineDomain();
480        }
481    
482        // Get a new application id
483        YarnClientApplication app = yarnClient.createApplication();
484        GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
485        // TODO get min/max resource capabilities from RM and change memory ask if needed
486        // If we do not have min/max, we may not be able to correctly request 
487        // the required resources from the RM for the app master
488        // Memory ask has to be a multiple of min and less than max. 
489        // Dump out information about cluster capability as seen by the resource manager
490        int maxMem = appResponse.getMaximumResourceCapability().getMemory();
491        LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
492    
493        // A resource ask cannot exceed the max. 
494        if (amMemory > maxMem) {
495          LOG.info("AM memory specified above max threshold of cluster. Using max value."
496              + ", specified=" + amMemory
497              + ", max=" + maxMem);
498          amMemory = maxMem;
499        }                           
500    
501        int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
502        LOG.info("Max virtual cores capabililty of resources in this cluster " + maxVCores);
503        
504        if (amVCores > maxVCores) {
505          LOG.info("AM virtual cores specified above max threshold of cluster. " 
506              + "Using max value." + ", specified=" + amVCores 
507              + ", max=" + maxVCores);
508          amVCores = maxVCores;
509        }
510        
511        // set the application name
512        ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
513        ApplicationId appId = appContext.getApplicationId();
514    
515        appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
516        appContext.setApplicationName(appName);
517    
518        if (attemptFailuresValidityInterval >= 0) {
519          appContext
520            .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
521        }
522    
523        // set local resources for the application master
524        // local files or archives as needed
525        // In this scenario, the jar file for the application master is part of the local resources                 
526        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
527    
528        LOG.info("Copy App Master jar from local filesystem and add to local environment");
529        // Copy the application master jar to the filesystem 
530        // Create a local resource to point to the destination jar path 
531        FileSystem fs = FileSystem.get(conf);
532        addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
533            localResources, null);
534    
535        // Set the log4j properties if needed 
536        if (!log4jPropFile.isEmpty()) {
537          addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
538              localResources, null);
539        }                   
540    
541        // The shell script has to be made available on the final container(s)
542        // where it will be executed. 
543        // To do this, we need to first copy into the filesystem that is visible 
544        // to the yarn framework. 
545        // We do not need to set this as a local resource for the application 
546        // master as the application master does not need it.               
547        String hdfsShellScriptLocation = ""; 
548        long hdfsShellScriptLen = 0;
549        long hdfsShellScriptTimestamp = 0;
550        if (!shellScriptPath.isEmpty()) {
551          Path shellSrc = new Path(shellScriptPath);
552          String shellPathSuffix =
553              appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
554          Path shellDst =
555              new Path(fs.getHomeDirectory(), shellPathSuffix);
556          fs.copyFromLocalFile(false, true, shellSrc, shellDst);
557          hdfsShellScriptLocation = shellDst.toUri().toString(); 
558          FileStatus shellFileStatus = fs.getFileStatus(shellDst);
559          hdfsShellScriptLen = shellFileStatus.getLen();
560          hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
561        }
562    
563        if (!shellCommand.isEmpty()) {
564          addToLocalResources(fs, null, shellCommandPath, appId.toString(),
565              localResources, shellCommand);
566        }
567    
568        if (shellArgs.length > 0) {
569          addToLocalResources(fs, null, shellArgsPath, appId.toString(),
570              localResources, StringUtils.join(shellArgs, " "));
571        }
572    
573        // Set the necessary security tokens as needed
574        //amContainer.setContainerTokens(containerToken);
575    
576        // Set the env variables to be setup in the env where the application master will be run
577        LOG.info("Set the environment for the application master");
578        Map<String, String> env = new HashMap<String, String>();
579    
580        // put location of shell script into env
581        // using the env info, the application master will create the correct local resource for the 
582        // eventual containers that will be launched to execute the shell scripts
583        env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
584        env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
585        env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
586        if (domainId != null && domainId.length() > 0) {
587          env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId);
588        }
589    
590        // Add AppMaster.jar location to classpath          
591        // At some point we should not be required to add 
592        // the hadoop specific classpaths to the env. 
593        // It should be provided out of the box. 
594        // For now setting all required classpaths including
595        // the classpath to "." for the application jar
596        StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
597          .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
598        for (String c : conf.getStrings(
599            YarnConfiguration.YARN_APPLICATION_CLASSPATH,
600            YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
601          classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
602          classPathEnv.append(c.trim());
603        }
604        classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
605          "./log4j.properties");
606    
607        // add the runtime classpath needed for tests to work
608        if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
609          classPathEnv.append(':');
610          classPathEnv.append(System.getProperty("java.class.path"));
611        }
612    
613        env.put("CLASSPATH", classPathEnv.toString());
614    
615        // Set the necessary command to execute the application master 
616        Vector<CharSequence> vargs = new Vector<CharSequence>(30);
617    
618        // Set java executable command 
619        LOG.info("Setting up app master command");
620        vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
621        // Set Xmx based on am memory size
622        vargs.add("-Xmx" + amMemory + "m");
623        // Set class name 
624        vargs.add(appMasterMainClass);
625        // Set params for Application Master
626        vargs.add("--container_memory " + String.valueOf(containerMemory));
627        vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
628        vargs.add("--num_containers " + String.valueOf(numContainers));
629        if (null != nodeLabelExpression) {
630          appContext.setNodeLabelExpression(nodeLabelExpression);
631        }
632        vargs.add("--priority " + String.valueOf(shellCmdPriority));
633    
634        for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
635          vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
636        }                   
637        if (debugFlag) {
638          vargs.add("--debug");
639        }
640    
641        vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
642        vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
643    
644        // Get final commmand
645        StringBuilder command = new StringBuilder();
646        for (CharSequence str : vargs) {
647          command.append(str).append(" ");
648        }
649    
650        LOG.info("Completed setting up app master command " + command.toString());     
651        List<String> commands = new ArrayList<String>();
652        commands.add(command.toString());           
653    
654        // Set up the container launch context for the application master
655        ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
656          localResources, env, commands, null, null, null);
657    
658        // Set up resource type requirements
659        // For now, both memory and vcores are supported, so we set memory and 
660        // vcores requirements
661        Resource capability = Resource.newInstance(amMemory, amVCores);
662        appContext.setResource(capability);
663    
664        // Service data is a binary blob that can be passed to the application
665        // Not needed in this scenario
666        // amContainer.setServiceData(serviceData);
667    
668        // Setup security tokens
669        if (UserGroupInformation.isSecurityEnabled()) {
670          // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
671          Credentials credentials = new Credentials();
672          String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
673          if (tokenRenewer == null || tokenRenewer.length() == 0) {
674            throw new IOException(
675              "Can't get Master Kerberos principal for the RM to use as renewer");
676          }
677    
678          // For now, only getting tokens for the default file-system.
679          final Token<?> tokens[] =
680              fs.addDelegationTokens(tokenRenewer, credentials);
681          if (tokens != null) {
682            for (Token<?> token : tokens) {
683              LOG.info("Got dt for " + fs.getUri() + "; " + token);
684            }
685          }
686          DataOutputBuffer dob = new DataOutputBuffer();
687          credentials.writeTokenStorageToStream(dob);
688          ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
689          amContainer.setTokens(fsTokens);
690        }
691    
692        appContext.setAMContainerSpec(amContainer);
693    
694        // Set the priority for the application master
695        // TODO - what is the range for priority? how to decide? 
696        Priority pri = Priority.newInstance(amPriority);
697        appContext.setPriority(pri);
698    
699        // Set the queue to which this application is to be submitted in the RM
700        appContext.setQueue(amQueue);
701    
702        // Submit the application to the applications manager
703        // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
704        // Ignore the response as either a valid response object is returned on success 
705        // or an exception thrown to denote some form of a failure
706        LOG.info("Submitting application to ASM");
707    
708        yarnClient.submitApplication(appContext);
709    
710        // TODO
711        // Try submitting the same request again
712        // app submission failure?
713    
714        // Monitor the application
715        return monitorApplication(appId);
716    
717      }
718    
719      /**
720       * Monitor the submitted application for completion. 
721       * Kill application if time expires. 
722       * @param appId Application Id of application to be monitored
723       * @return true if application completed successfully
724       * @throws YarnException
725       * @throws IOException
726       */
727      private boolean monitorApplication(ApplicationId appId)
728          throws YarnException, IOException {
729    
730        while (true) {
731    
732          // Check app status every 1 second.
733          try {
734            Thread.sleep(1000);
735          } catch (InterruptedException e) {
736            LOG.debug("Thread sleep in monitoring loop interrupted");
737          }
738    
739          // Get application report for the appId we are interested in 
740          ApplicationReport report = yarnClient.getApplicationReport(appId);
741    
742          LOG.info("Got application report from ASM for"
743              + ", appId=" + appId.getId()
744              + ", clientToAMToken=" + report.getClientToAMToken()
745              + ", appDiagnostics=" + report.getDiagnostics()
746              + ", appMasterHost=" + report.getHost()
747              + ", appQueue=" + report.getQueue()
748              + ", appMasterRpcPort=" + report.getRpcPort()
749              + ", appStartTime=" + report.getStartTime()
750              + ", yarnAppState=" + report.getYarnApplicationState().toString()
751              + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
752              + ", appTrackingUrl=" + report.getTrackingUrl()
753              + ", appUser=" + report.getUser());
754    
755          YarnApplicationState state = report.getYarnApplicationState();
756          FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
757          if (YarnApplicationState.FINISHED == state) {
758            if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
759              LOG.info("Application has completed successfully. Breaking monitoring loop");
760              return true;        
761            }
762            else {
763              LOG.info("Application did finished unsuccessfully."
764                  + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
765                  + ". Breaking monitoring loop");
766              return false;
767            }                         
768          }
769          else if (YarnApplicationState.KILLED == state     
770              || YarnApplicationState.FAILED == state) {
771            LOG.info("Application did not finish."
772                + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
773                + ". Breaking monitoring loop");
774            return false;
775          }                 
776    
777          if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
778            LOG.info("Reached client specified timeout for application. Killing application");
779            forceKillApplication(appId);
780            return false;                           
781          }
782        }                   
783    
784      }
785    
786      /**
787       * Kill a submitted application by sending a call to the ASM
788       * @param appId Application Id to be killed. 
789       * @throws YarnException
790       * @throws IOException
791       */
792      private void forceKillApplication(ApplicationId appId)
793          throws YarnException, IOException {
794        // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 
795        // the same time. 
796        // If yes, can we kill a particular attempt only?
797    
798        // Response can be ignored as it is non-null on success or 
799        // throws an exception in case of failures
800        yarnClient.killApplication(appId);  
801      }
802    
803      private void addToLocalResources(FileSystem fs, String fileSrcPath,
804          String fileDstPath, String appId, Map<String, LocalResource> localResources,
805          String resources) throws IOException {
806        String suffix =
807            appName + "/" + appId + "/" + fileDstPath;
808        Path dst =
809            new Path(fs.getHomeDirectory(), suffix);
810        if (fileSrcPath == null) {
811          FSDataOutputStream ostream = null;
812          try {
813            ostream = FileSystem
814                .create(fs, dst, new FsPermission((short) 0710));
815            ostream.writeUTF(resources);
816          } finally {
817            IOUtils.closeQuietly(ostream);
818          }
819        } else {
820          fs.copyFromLocalFile(new Path(fileSrcPath), dst);
821        }
822        FileStatus scFileStatus = fs.getFileStatus(dst);
823        LocalResource scRsrc =
824            LocalResource.newInstance(
825                ConverterUtils.getYarnUrlFromURI(dst.toUri()),
826                LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
827                scFileStatus.getLen(), scFileStatus.getModificationTime());
828        localResources.put(fileDstPath, scRsrc);
829      }
830    
831      private void prepareTimelineDomain() {
832        TimelineClient timelineClient = null;
833        if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
834            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
835          timelineClient = TimelineClient.createTimelineClient();
836          timelineClient.init(conf);
837          timelineClient.start();
838        } else {
839          LOG.warn("Cannot put the domain " + domainId +
840              " because the timeline service is not enabled");
841          return;
842        }
843        try {
844          //TODO: we need to check and combine the existing timeline domain ACLs,
845          //but let's do it once we have client java library to query domains.
846          TimelineDomain domain = new TimelineDomain();
847          domain.setId(domainId);
848          domain.setReaders(
849              viewACLs != null && viewACLs.length() > 0 ? viewACLs : " ");
850          domain.setWriters(
851              modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " ");
852          timelineClient.putDomain(domain);
853          LOG.info("Put the timeline domain: " +
854              TimelineUtils.dumpTimelineRecordtoJSON(domain));
855        } catch (Exception e) {
856          LOG.error("Error when putting the timeline domain", e);
857        } finally {
858          timelineClient.stop();
859        }
860      }
861    }