001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.applications.distributedshell;
020
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Vector;
028
029import org.apache.commons.cli.CommandLine;
030import org.apache.commons.cli.GnuParser;
031import org.apache.commons.cli.HelpFormatter;
032import org.apache.commons.cli.Option;
033import org.apache.commons.cli.Options;
034import org.apache.commons.cli.ParseException;
035import org.apache.commons.io.IOUtils;
036import org.apache.commons.lang.StringUtils;
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.apache.hadoop.classification.InterfaceAudience;
040import org.apache.hadoop.classification.InterfaceStability;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FSDataOutputStream;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.permission.FsPermission;
047import org.apache.hadoop.io.DataOutputBuffer;
048import org.apache.hadoop.security.Credentials;
049import org.apache.hadoop.security.UserGroupInformation;
050import org.apache.hadoop.security.token.Token;
051import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
052import org.apache.hadoop.yarn.api.ApplicationConstants;
053import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
054import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
055import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
056import org.apache.hadoop.yarn.api.records.ApplicationId;
057import org.apache.hadoop.yarn.api.records.ApplicationReport;
058import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
059import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
060import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
061import org.apache.hadoop.yarn.api.records.LocalResource;
062import org.apache.hadoop.yarn.api.records.LocalResourceType;
063import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
064import org.apache.hadoop.yarn.api.records.NodeReport;
065import org.apache.hadoop.yarn.api.records.NodeState;
066import org.apache.hadoop.yarn.api.records.Priority;
067import org.apache.hadoop.yarn.api.records.QueueACL;
068import org.apache.hadoop.yarn.api.records.QueueInfo;
069import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
070import org.apache.hadoop.yarn.api.records.Resource;
071import org.apache.hadoop.yarn.api.records.URL;
072import org.apache.hadoop.yarn.api.records.YarnApplicationState;
073import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
074import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
075import org.apache.hadoop.yarn.client.api.TimelineClient;
076import org.apache.hadoop.yarn.client.api.YarnClient;
077import org.apache.hadoop.yarn.client.api.YarnClientApplication;
078import org.apache.hadoop.yarn.conf.YarnConfiguration;
079import org.apache.hadoop.yarn.exceptions.YarnException;
080import org.apache.hadoop.yarn.util.ConverterUtils;
081import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
082
083/**
084 * Client for Distributed Shell application submission to YARN.
085 * 
086 * <p> The distributed shell client allows an application master to be launched that in turn would run 
087 * the provided shell command on a set of containers. </p>
088 * 
089 * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
090 * 
091 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 
092 * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} 
093 * provides a way for the client to get access to cluster information and to request for a
094 * new {@link ApplicationId}. <p>
095 * 
096 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 
097 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 
098 * and application name, the priority assigned to the application and the queue
099 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
100 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 
101 * the {@link ApplicationMaster} is launched. </p>
102 * 
103 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 
104 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 
105 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 
106 * {@link ApplicationMaster}. <p>
107 * 
108 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 
109 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 
110 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 
111 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
112 *
113 */
114@InterfaceAudience.Public
115@InterfaceStability.Unstable
116public class Client {
117
118  private static final Log LOG = LogFactory.getLog(Client.class);
119  
120  // Configuration
121  private Configuration conf;
122  private YarnClient yarnClient;
123  // Application master specific info to register a new Application with RM/ASM
124  private String appName = "";
125  // App master priority
126  private int amPriority = 0;
127  // Queue for App master
128  private String amQueue = "";
129  // Amt. of memory resource to request for to run the App Master
130  private long amMemory = 100;
131  // Amt. of virtual core resource to request for to run the App Master
132  private int amVCores = 1;
133
134  // Application master jar file
135  private String appMasterJar = ""; 
136  // Main class to invoke application master
137  private final String appMasterMainClass;
138
139  // Shell command to be executed 
140  private String shellCommand = ""; 
141  // Location of shell script 
142  private String shellScriptPath = ""; 
143  // Args to be passed to the shell command
144  private String[] shellArgs = new String[] {};
145  // Env variables to be setup for the shell command 
146  private Map<String, String> shellEnv = new HashMap<String, String>();
147  // Shell Command Container priority 
148  private int shellCmdPriority = 0;
149
150  // Amt of memory to request for container in which shell script will be executed
151  private int containerMemory = 10; 
152  // Amt. of virtual cores to request for container in which shell script will be executed
153  private int containerVirtualCores = 1;
154  // No. of containers in which the shell script needs to be executed
155  private int numContainers = 1;
156  private String nodeLabelExpression = null;
157
158  // log4j.properties file 
159  // if available, add to local resources and set into classpath 
160  private String log4jPropFile = "";    
161
162  // Start time for client
163  private final long clientStartTime = System.currentTimeMillis();
164  // Timeout threshold for client. Kill app after time interval expires.
165  private long clientTimeout = 600000;
166
167  // flag to indicate whether to keep containers across application attempts.
168  private boolean keepContainers = false;
169
170  private long attemptFailuresValidityInterval = -1;
171
172  // Debug flag
173  boolean debugFlag = false;
174
175  // Timeline domain ID
176  private String domainId = null;
177
178  // Flag to indicate whether to create the domain of the given ID
179  private boolean toCreateDomain = false;
180
181  // Timeline domain reader access control
182  private String viewACLs = null;
183
184  // Timeline domain writer access control
185  private String modifyACLs = null;
186
187  // Command line options
188  private Options opts;
189
190  private static final String shellCommandPath = "shellCommands";
191  private static final String shellArgsPath = "shellArgs";
192  private static final String appMasterJarPath = "AppMaster.jar";
193  // Hardcoded path to custom log_properties
194  private static final String log4jPath = "log4j.properties";
195
196  public static final String SCRIPT_PATH = "ExecScript";
197
198  /**
199   * @param args Command line arguments 
200   */
201  public static void main(String[] args) {
202    boolean result = false;
203    try {
204      Client client = new Client();
205      LOG.info("Initializing Client");
206      try {
207        boolean doRun = client.init(args);
208        if (!doRun) {
209          System.exit(0);
210        }
211      } catch (IllegalArgumentException e) {
212        System.err.println(e.getLocalizedMessage());
213        client.printUsage();
214        System.exit(-1);
215      }
216      result = client.run();
217    } catch (Throwable t) {
218      LOG.fatal("Error running Client", t);
219      System.exit(1);
220    }
221    if (result) {
222      LOG.info("Application completed successfully");
223      System.exit(0);                   
224    } 
225    LOG.error("Application failed to complete successfully");
226    System.exit(2);
227  }
228
229  /**
230   */
231  public Client(Configuration conf) throws Exception  {
232    this(
233      "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster",
234      conf);
235  }
236
237  Client(String appMasterMainClass, Configuration conf) {
238    this.conf = conf;
239    this.appMasterMainClass = appMasterMainClass;
240    yarnClient = YarnClient.createYarnClient();
241    yarnClient.init(conf);
242    opts = new Options();
243    opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
244    opts.addOption("priority", true, "Application Priority. Default 0");
245    opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
246    opts.addOption("timeout", true, "Application timeout in milliseconds");
247    opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
248    opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
249    opts.addOption("jar", true, "Jar file containing the application master");
250    opts.addOption("shell_command", true, "Shell command to be executed by " +
251        "the Application Master. Can only specify either --shell_command " +
252        "or --shell_script");
253    opts.addOption("shell_script", true, "Location of the shell script to be " +
254        "executed. Can only specify either --shell_command or --shell_script");
255    opts.addOption("shell_args", true, "Command line args for the shell script." +
256        "Multiple args can be separated by empty space.");
257    opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
258    opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
259    opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
260    opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
261    opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
262    opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
263    opts.addOption("log_properties", true, "log4j.properties file");
264    opts.addOption("keep_containers_across_application_attempts", false,
265      "Flag to indicate whether to keep containers across application attempts." +
266      " If the flag is true, running containers will not be killed when" +
267      " application attempt fails and these containers will be retrieved by" +
268      " the new application attempt ");
269    opts.addOption("attempt_failures_validity_interval", true,
270      "when attempt_failures_validity_interval in milliseconds is set to > 0," +
271      "the failure number will not take failures which happen out of " +
272      "the validityInterval into failure count. " +
273      "If failure count reaches to maxAppAttempts, " +
274      "the application will be failed.");
275    opts.addOption("debug", false, "Dump out debug information");
276    opts.addOption("domain", true, "ID of the timeline domain where the "
277        + "timeline entities will be put");
278    opts.addOption("view_acls", true, "Users and groups that allowed to "
279        + "view the timeline entities in the given domain");
280    opts.addOption("modify_acls", true, "Users and groups that allowed to "
281        + "modify the timeline entities in the given domain");
282    opts.addOption("create", false, "Flag to indicate whether to create the "
283        + "domain specified with -domain.");
284    opts.addOption("help", false, "Print usage");
285    opts.addOption("node_label_expression", true,
286        "Node label expression to determine the nodes"
287            + " where all the containers of this application"
288            + " will be allocated, \"\" means containers"
289            + " can be allocated anywhere, if you don't specify the option,"
290            + " default node_label_expression of queue will be used.");
291  }
292
293  /**
294   */
295  public Client() throws Exception  {
296    this(new YarnConfiguration());
297  }
298
299  /**
300   * Helper function to print out usage
301   */
302  private void printUsage() {
303    new HelpFormatter().printHelp("Client", opts);
304  }
305
306  /**
307   * Parse command line options
308   * @param args Parsed command line options 
309   * @return Whether the init was successful to run the client
310   * @throws ParseException
311   */
312  public boolean init(String[] args) throws ParseException {
313
314    CommandLine cliParser = new GnuParser().parse(opts, args);
315
316    if (args.length == 0) {
317      throw new IllegalArgumentException("No args specified for client to initialize");
318    }
319
320    if (cliParser.hasOption("log_properties")) {
321      String log4jPath = cliParser.getOptionValue("log_properties");
322      try {
323        Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath);
324      } catch (Exception e) {
325        LOG.warn("Can not set up custom log4j properties. " + e);
326      }
327    }
328
329    if (cliParser.hasOption("help")) {
330      printUsage();
331      return false;
332    }
333
334    if (cliParser.hasOption("debug")) {
335      debugFlag = true;
336
337    }
338
339    if (cliParser.hasOption("keep_containers_across_application_attempts")) {
340      LOG.info("keep_containers_across_application_attempts");
341      keepContainers = true;
342    }
343
344    appName = cliParser.getOptionValue("appname", "DistributedShell");
345    amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
346    amQueue = cliParser.getOptionValue("queue", "default");
347    amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "100"));
348    amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
349
350    if (amMemory < 0) {
351      throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
352          + " Specified memory=" + amMemory);
353    }
354    if (amVCores < 0) {
355      throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
356          + " Specified virtual cores=" + amVCores);
357    }
358
359    if (!cliParser.hasOption("jar")) {
360      throw new IllegalArgumentException("No jar file specified for application master");
361    }           
362
363    appMasterJar = cliParser.getOptionValue("jar");
364
365    if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) {
366      throw new IllegalArgumentException(
367          "No shell command or shell script specified to be executed by application master");
368    } else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) {
369      throw new IllegalArgumentException("Can not specify shell_command option " +
370          "and shell_script option at the same time");
371    } else if (cliParser.hasOption("shell_command")) {
372      shellCommand = cliParser.getOptionValue("shell_command");
373    } else {
374      shellScriptPath = cliParser.getOptionValue("shell_script");
375    }
376    if (cliParser.hasOption("shell_args")) {
377      shellArgs = cliParser.getOptionValues("shell_args");
378    }
379    if (cliParser.hasOption("shell_env")) { 
380      String envs[] = cliParser.getOptionValues("shell_env");
381      for (String env : envs) {
382        env = env.trim();
383        int index = env.indexOf('=');
384        if (index == -1) {
385          shellEnv.put(env, "");
386          continue;
387        }
388        String key = env.substring(0, index);
389        String val = "";
390        if (index < (env.length()-1)) {
391          val = env.substring(index+1);
392        }
393        shellEnv.put(key, val);
394      }
395    }
396    shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
397
398    containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
399    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
400    numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
401    
402
403    if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
404      throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
405          + " exiting."
406          + " Specified containerMemory=" + containerMemory
407          + ", containerVirtualCores=" + containerVirtualCores
408          + ", numContainer=" + numContainers);
409    }
410    
411    nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null);
412
413    clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
414
415    attemptFailuresValidityInterval =
416        Long.parseLong(cliParser.getOptionValue(
417          "attempt_failures_validity_interval", "-1"));
418
419    log4jPropFile = cliParser.getOptionValue("log_properties", "");
420
421    // Get timeline domain options
422    if (cliParser.hasOption("domain")) {
423      domainId = cliParser.getOptionValue("domain");
424      toCreateDomain = cliParser.hasOption("create");
425      if (cliParser.hasOption("view_acls")) {
426        viewACLs = cliParser.getOptionValue("view_acls");
427      }
428      if (cliParser.hasOption("modify_acls")) {
429        modifyACLs = cliParser.getOptionValue("modify_acls");
430      }
431    }
432
433    return true;
434  }
435
436  /**
437   * Main run function for the client
438   * @return true if application completed successfully
439   * @throws IOException
440   * @throws YarnException
441   */
442  public boolean run() throws IOException, YarnException {
443
444    LOG.info("Running Client");
445    yarnClient.start();
446
447    YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
448    LOG.info("Got Cluster metric info from ASM" 
449        + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
450
451    List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
452        NodeState.RUNNING);
453    LOG.info("Got Cluster node info from ASM");
454    for (NodeReport node : clusterNodeReports) {
455      LOG.info("Got node report from ASM for"
456          + ", nodeId=" + node.getNodeId() 
457          + ", nodeAddress=" + node.getHttpAddress()
458          + ", nodeRackName=" + node.getRackName()
459          + ", nodeNumContainers=" + node.getNumContainers());
460    }
461
462    QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
463    LOG.info("Queue info"
464        + ", queueName=" + queueInfo.getQueueName()
465        + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
466        + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
467        + ", queueApplicationCount=" + queueInfo.getApplications().size()
468        + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());               
469
470    List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
471    for (QueueUserACLInfo aclInfo : listAclInfo) {
472      for (QueueACL userAcl : aclInfo.getUserAcls()) {
473        LOG.info("User ACL Info for Queue"
474            + ", queueName=" + aclInfo.getQueueName()                   
475            + ", userAcl=" + userAcl.name());
476      }
477    }           
478
479    if (domainId != null && domainId.length() > 0 && toCreateDomain) {
480      prepareTimelineDomain();
481    }
482
483    // Get a new application id
484    YarnClientApplication app = yarnClient.createApplication();
485    GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
486    // TODO get min/max resource capabilities from RM and change memory ask if needed
487    // If we do not have min/max, we may not be able to correctly request 
488    // the required resources from the RM for the app master
489    // Memory ask has to be a multiple of min and less than max. 
490    // Dump out information about cluster capability as seen by the resource manager
491    long maxMem = appResponse.getMaximumResourceCapability().getMemorySize();
492    LOG.info("Max mem capability of resources in this cluster " + maxMem);
493
494    // A resource ask cannot exceed the max. 
495    if (amMemory > maxMem) {
496      LOG.info("AM memory specified above max threshold of cluster. Using max value."
497          + ", specified=" + amMemory
498          + ", max=" + maxMem);
499      amMemory = maxMem;
500    }                           
501
502    int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
503    LOG.info("Max virtual cores capability of resources in this cluster " + maxVCores);
504    
505    if (amVCores > maxVCores) {
506      LOG.info("AM virtual cores specified above max threshold of cluster. " 
507          + "Using max value." + ", specified=" + amVCores 
508          + ", max=" + maxVCores);
509      amVCores = maxVCores;
510    }
511    
512    // set the application name
513    ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
514    ApplicationId appId = appContext.getApplicationId();
515
516    appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
517    appContext.setApplicationName(appName);
518
519    if (attemptFailuresValidityInterval >= 0) {
520      appContext
521        .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
522    }
523
524    // set local resources for the application master
525    // local files or archives as needed
526    // In this scenario, the jar file for the application master is part of the local resources                 
527    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
528
529    LOG.info("Copy App Master jar from local filesystem and add to local environment");
530    // Copy the application master jar to the filesystem 
531    // Create a local resource to point to the destination jar path 
532    FileSystem fs = FileSystem.get(conf);
533    addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
534        localResources, null);
535
536    // Set the log4j properties if needed 
537    if (!log4jPropFile.isEmpty()) {
538      addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
539          localResources, null);
540    }                   
541
542    // The shell script has to be made available on the final container(s)
543    // where it will be executed. 
544    // To do this, we need to first copy into the filesystem that is visible 
545    // to the yarn framework. 
546    // We do not need to set this as a local resource for the application 
547    // master as the application master does not need it.               
548    String hdfsShellScriptLocation = ""; 
549    long hdfsShellScriptLen = 0;
550    long hdfsShellScriptTimestamp = 0;
551    if (!shellScriptPath.isEmpty()) {
552      Path shellSrc = new Path(shellScriptPath);
553      String shellPathSuffix =
554          appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
555      Path shellDst =
556          new Path(fs.getHomeDirectory(), shellPathSuffix);
557      fs.copyFromLocalFile(false, true, shellSrc, shellDst);
558      hdfsShellScriptLocation = shellDst.toUri().toString(); 
559      FileStatus shellFileStatus = fs.getFileStatus(shellDst);
560      hdfsShellScriptLen = shellFileStatus.getLen();
561      hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
562    }
563
564    if (!shellCommand.isEmpty()) {
565      addToLocalResources(fs, null, shellCommandPath, appId.toString(),
566          localResources, shellCommand);
567    }
568
569    if (shellArgs.length > 0) {
570      addToLocalResources(fs, null, shellArgsPath, appId.toString(),
571          localResources, StringUtils.join(shellArgs, " "));
572    }
573
574    // Set the necessary security tokens as needed
575    //amContainer.setContainerTokens(containerToken);
576
577    // Set the env variables to be setup in the env where the application master will be run
578    LOG.info("Set the environment for the application master");
579    Map<String, String> env = new HashMap<String, String>();
580
581    // put location of shell script into env
582    // using the env info, the application master will create the correct local resource for the 
583    // eventual containers that will be launched to execute the shell scripts
584    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
585    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
586    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
587    if (domainId != null && domainId.length() > 0) {
588      env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId);
589    }
590
591    // Add AppMaster.jar location to classpath          
592    // At some point we should not be required to add 
593    // the hadoop specific classpaths to the env. 
594    // It should be provided out of the box. 
595    // For now setting all required classpaths including
596    // the classpath to "." for the application jar
597    StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
598      .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
599    for (String c : conf.getStrings(
600        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
601        YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
602      classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
603      classPathEnv.append(c.trim());
604    }
605    classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
606      "./log4j.properties");
607
608    // add the runtime classpath needed for tests to work
609    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
610      classPathEnv.append(':');
611      classPathEnv.append(System.getProperty("java.class.path"));
612    }
613
614    env.put("CLASSPATH", classPathEnv.toString());
615
616    // Set the necessary command to execute the application master 
617    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
618
619    // Set java executable command 
620    LOG.info("Setting up app master command");
621    vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
622    // Set Xmx based on am memory size
623    vargs.add("-Xmx" + amMemory + "m");
624    // Set class name 
625    vargs.add(appMasterMainClass);
626    // Set params for Application Master
627    vargs.add("--container_memory " + String.valueOf(containerMemory));
628    vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
629    vargs.add("--num_containers " + String.valueOf(numContainers));
630    if (null != nodeLabelExpression) {
631      appContext.setNodeLabelExpression(nodeLabelExpression);
632    }
633    vargs.add("--priority " + String.valueOf(shellCmdPriority));
634
635    for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
636      vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
637    }                   
638    if (debugFlag) {
639      vargs.add("--debug");
640    }
641
642    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
643    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
644
645    // Get final commmand
646    StringBuilder command = new StringBuilder();
647    for (CharSequence str : vargs) {
648      command.append(str).append(" ");
649    }
650
651    LOG.info("Completed setting up app master command " + command.toString());     
652    List<String> commands = new ArrayList<String>();
653    commands.add(command.toString());           
654
655    // Set up the container launch context for the application master
656    ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
657      localResources, env, commands, null, null, null);
658
659    // Set up resource type requirements
660    // For now, both memory and vcores are supported, so we set memory and 
661    // vcores requirements
662    Resource capability = Resource.newInstance(amMemory, amVCores);
663    appContext.setResource(capability);
664
665    // Service data is a binary blob that can be passed to the application
666    // Not needed in this scenario
667    // amContainer.setServiceData(serviceData);
668
669    // Setup security tokens
670    if (UserGroupInformation.isSecurityEnabled()) {
671      // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
672      Credentials credentials = new Credentials();
673      String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
674      if (tokenRenewer == null || tokenRenewer.length() == 0) {
675        throw new IOException(
676          "Can't get Master Kerberos principal for the RM to use as renewer");
677      }
678
679      // For now, only getting tokens for the default file-system.
680      final Token<?> tokens[] =
681          fs.addDelegationTokens(tokenRenewer, credentials);
682      if (tokens != null) {
683        for (Token<?> token : tokens) {
684          LOG.info("Got dt for " + fs.getUri() + "; " + token);
685        }
686      }
687      DataOutputBuffer dob = new DataOutputBuffer();
688      credentials.writeTokenStorageToStream(dob);
689      ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
690      amContainer.setTokens(fsTokens);
691    }
692
693    appContext.setAMContainerSpec(amContainer);
694
695    // Set the priority for the application master
696    // TODO - what is the range for priority? how to decide? 
697    Priority pri = Priority.newInstance(amPriority);
698    appContext.setPriority(pri);
699
700    // Set the queue to which this application is to be submitted in the RM
701    appContext.setQueue(amQueue);
702
703    // Submit the application to the applications manager
704    // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
705    // Ignore the response as either a valid response object is returned on success 
706    // or an exception thrown to denote some form of a failure
707    LOG.info("Submitting application to ASM");
708
709    yarnClient.submitApplication(appContext);
710
711    // TODO
712    // Try submitting the same request again
713    // app submission failure?
714
715    // Monitor the application
716    return monitorApplication(appId);
717
718  }
719
720  /**
721   * Monitor the submitted application for completion. 
722   * Kill application if time expires. 
723   * @param appId Application Id of application to be monitored
724   * @return true if application completed successfully
725   * @throws YarnException
726   * @throws IOException
727   */
728  private boolean monitorApplication(ApplicationId appId)
729      throws YarnException, IOException {
730
731    while (true) {
732
733      // Check app status every 1 second.
734      try {
735        Thread.sleep(1000);
736      } catch (InterruptedException e) {
737        LOG.debug("Thread sleep in monitoring loop interrupted");
738      }
739
740      // Get application report for the appId we are interested in 
741      ApplicationReport report = yarnClient.getApplicationReport(appId);
742
743      LOG.info("Got application report from ASM for"
744          + ", appId=" + appId.getId()
745          + ", clientToAMToken=" + report.getClientToAMToken()
746          + ", appDiagnostics=" + report.getDiagnostics()
747          + ", appMasterHost=" + report.getHost()
748          + ", appQueue=" + report.getQueue()
749          + ", appMasterRpcPort=" + report.getRpcPort()
750          + ", appStartTime=" + report.getStartTime()
751          + ", yarnAppState=" + report.getYarnApplicationState().toString()
752          + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
753          + ", appTrackingUrl=" + report.getTrackingUrl()
754          + ", appUser=" + report.getUser());
755
756      YarnApplicationState state = report.getYarnApplicationState();
757      FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
758      if (YarnApplicationState.FINISHED == state) {
759        if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
760          LOG.info("Application has completed successfully. Breaking monitoring loop");
761          return true;        
762        }
763        else {
764          LOG.info("Application did finished unsuccessfully."
765              + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
766              + ". Breaking monitoring loop");
767          return false;
768        }                         
769      }
770      else if (YarnApplicationState.KILLED == state     
771          || YarnApplicationState.FAILED == state) {
772        LOG.info("Application did not finish."
773            + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
774            + ". Breaking monitoring loop");
775        return false;
776      }                 
777
778      if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
779        LOG.info("Reached client specified timeout for application. Killing application");
780        forceKillApplication(appId);
781        return false;                           
782      }
783    }                   
784
785  }
786
787  /**
788   * Kill a submitted application by sending a call to the ASM
789   * @param appId Application Id to be killed. 
790   * @throws YarnException
791   * @throws IOException
792   */
793  private void forceKillApplication(ApplicationId appId)
794      throws YarnException, IOException {
795    // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 
796    // the same time. 
797    // If yes, can we kill a particular attempt only?
798
799    // Response can be ignored as it is non-null on success or 
800    // throws an exception in case of failures
801    yarnClient.killApplication(appId);  
802  }
803
804  private void addToLocalResources(FileSystem fs, String fileSrcPath,
805      String fileDstPath, String appId, Map<String, LocalResource> localResources,
806      String resources) throws IOException {
807    String suffix =
808        appName + "/" + appId + "/" + fileDstPath;
809    Path dst =
810        new Path(fs.getHomeDirectory(), suffix);
811    if (fileSrcPath == null) {
812      FSDataOutputStream ostream = null;
813      try {
814        ostream = FileSystem
815            .create(fs, dst, new FsPermission((short) 0710));
816        ostream.writeUTF(resources);
817      } finally {
818        IOUtils.closeQuietly(ostream);
819      }
820    } else {
821      fs.copyFromLocalFile(new Path(fileSrcPath), dst);
822    }
823    FileStatus scFileStatus = fs.getFileStatus(dst);
824    LocalResource scRsrc =
825        LocalResource.newInstance(
826            URL.fromURI(dst.toUri()),
827            LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
828            scFileStatus.getLen(), scFileStatus.getModificationTime());
829    localResources.put(fileDstPath, scRsrc);
830  }
831
832  private void prepareTimelineDomain() {
833    TimelineClient timelineClient = null;
834    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
835        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
836      timelineClient = TimelineClient.createTimelineClient();
837      timelineClient.init(conf);
838      timelineClient.start();
839    } else {
840      LOG.warn("Cannot put the domain " + domainId +
841          " because the timeline service is not enabled");
842      return;
843    }
844    try {
845      //TODO: we need to check and combine the existing timeline domain ACLs,
846      //but let's do it once we have client java library to query domains.
847      TimelineDomain domain = new TimelineDomain();
848      domain.setId(domainId);
849      domain.setReaders(
850          viewACLs != null && viewACLs.length() > 0 ? viewACLs : " ");
851      domain.setWriters(
852          modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " ");
853      timelineClient.putDomain(domain);
854      LOG.info("Put the timeline domain: " +
855          TimelineUtils.dumpTimelineRecordtoJSON(domain));
856    } catch (Exception e) {
857      LOG.error("Error when putting the timeline domain", e);
858    } finally {
859      timelineClient.stop();
860    }
861  }
862}