001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.applications.distributedshell;
020
021import java.io.BufferedReader;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.InputStreamReader;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Vector;
030
031import org.apache.commons.cli.CommandLine;
032import org.apache.commons.cli.GnuParser;
033import org.apache.commons.cli.HelpFormatter;
034import org.apache.commons.cli.Options;
035import org.apache.commons.cli.ParseException;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.apache.hadoop.classification.InterfaceAudience;
039import org.apache.hadoop.classification.InterfaceStability;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileStatus;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.yarn.api.ApplicationConstants;
045import org.apache.hadoop.yarn.api.ClientRMProtocol;
046import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
047import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
048import org.apache.hadoop.yarn.api.records.ApplicationId;
049import org.apache.hadoop.yarn.api.records.ApplicationReport;
050import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
051import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
052import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
053import org.apache.hadoop.yarn.api.records.LocalResource;
054import org.apache.hadoop.yarn.api.records.LocalResourceType;
055import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
056import org.apache.hadoop.yarn.api.records.NodeReport;
057import org.apache.hadoop.yarn.api.records.Priority;
058import org.apache.hadoop.yarn.api.records.QueueACL;
059import org.apache.hadoop.yarn.api.records.QueueInfo;
060import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
061import org.apache.hadoop.yarn.api.records.Resource;
062import org.apache.hadoop.yarn.api.records.YarnApplicationState;
063import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
064import org.apache.hadoop.yarn.client.YarnClientImpl;
065import org.apache.hadoop.yarn.conf.YarnConfiguration;
066import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
067import org.apache.hadoop.yarn.util.ConverterUtils;
068import org.apache.hadoop.yarn.util.Records;
069
070/**
071 * Client for Distributed Shell application submission to YARN.
072 * 
073 * <p> The distributed shell client allows an application master to be launched that in turn would run 
074 * the provided shell command on a set of containers. </p>
075 * 
076 * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
077 * 
078 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 
079 * aka ApplicationsManager or ASM via the {@link ClientRMProtocol}. The {@link ClientRMProtocol} 
080 * provides a way for the client to get access to cluster information and to request for a
081 * new {@link ApplicationId}. <p>
082 * 
083 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 
084 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 
085 * and application name, user submitting the application, the priority assigned to the application and the queue 
086 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
087 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 
088 * the {@link ApplicationMaster} is launched. </p>
089 * 
090 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 
091 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 
092 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 
093 * {@link ApplicationMaster}. <p>
094 * 
095 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 
096 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 
097 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 
098 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
099 *
100 */
101@InterfaceAudience.Public
102@InterfaceStability.Unstable
103public class Client extends YarnClientImpl {
104
105  private static final Log LOG = LogFactory.getLog(Client.class);
106
107  // Configuration
108  private Configuration conf;
109
110  // Application master specific info to register a new Application with RM/ASM
111  private String appName = "";
112  // App master priority
113  private int amPriority = 0;
114  // Queue for App master
115  private String amQueue = "";
116  // User to run app master as
117  private String amUser = "";
118  // Amt. of memory resource to request for to run the App Master
119  private int amMemory = 10; 
120
121  // Application master jar file
122  private String appMasterJar = ""; 
123  // Main class to invoke application master
124  private String appMasterMainClass = "";
125
126  // Shell command to be executed 
127  private String shellCommand = ""; 
128  // Location of shell script 
129  private String shellScriptPath = ""; 
130  // Args to be passed to the shell command
131  private String shellArgs = "";
132  // Env variables to be setup for the shell command 
133  private Map<String, String> shellEnv = new HashMap<String, String>();
134  // Shell Command Container priority 
135  private int shellCmdPriority = 0;
136
137  // Amt of memory to request for container in which shell script will be executed
138  private int containerMemory = 10; 
139  // No. of containers in which the shell script needs to be executed
140  private int numContainers = 1;
141
142  // log4j.properties file 
143  // if available, add to local resources and set into classpath 
144  private String log4jPropFile = "";    
145
146  // Start time for client
147  private final long clientStartTime = System.currentTimeMillis();
148  // Timeout threshold for client. Kill app after time interval expires.
149  private long clientTimeout = 600000;
150
151  // Debug flag
152  boolean debugFlag = false;    
153
154  /**
155   * @param args Command line arguments 
156   */
157  public static void main(String[] args) {
158    boolean result = false;
159    try {
160      Client client = new Client();
161      LOG.info("Initializing Client");
162      boolean doRun = client.init(args);
163      if (!doRun) {
164        System.exit(0);
165      }
166      result = client.run();
167    } catch (Throwable t) {
168      LOG.fatal("Error running CLient", t);
169      System.exit(1);
170    }
171    if (result) {
172      LOG.info("Application completed successfully");
173      System.exit(0);                   
174    } 
175    LOG.error("Application failed to complete successfully");
176    System.exit(2);
177  }
178
179  /**
180   */
181  public Client(Configuration conf) throws Exception  {
182    super();
183    this.conf = conf;
184    init(conf);
185  }
186
187  /**
188   */
189  public Client() throws Exception  {
190    this(new Configuration());
191  }
192
193  /**
194   * Helper function to print out usage
195   * @param opts Parsed command line options 
196   */
197  private void printUsage(Options opts) {
198    new HelpFormatter().printHelp("Client", opts);
199  }
200
201  /**
202   * Parse command line options
203   * @param args Parsed command line options 
204   * @return Whether the init was successful to run the client
205   */
206  public boolean init(String[] args) throws ParseException {
207
208    Options opts = new Options();
209    opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
210    opts.addOption("priority", true, "Application Priority. Default 0");
211    opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
212    opts.addOption("user", true, "User to run the application as");
213    opts.addOption("timeout", true, "Application timeout in milliseconds");
214    opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
215    opts.addOption("jar", true, "Jar file containing the application master");
216    opts.addOption("class", true, "Main class to  be run for the Application Master.");
217    opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
218    opts.addOption("shell_script", true, "Location of the shell script to be executed");
219    opts.addOption("shell_args", true, "Command line args for the shell script");
220    opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
221    opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");            
222    opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
223    opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
224    opts.addOption("log_properties", true, "log4j.properties file");
225    opts.addOption("debug", false, "Dump out debug information");
226    opts.addOption("help", false, "Print usage");
227    CommandLine cliParser = new GnuParser().parse(opts, args);
228
229    if (args.length == 0) {
230      printUsage(opts);
231      throw new IllegalArgumentException("No args specified for client to initialize");
232    }           
233
234    if (cliParser.hasOption("help")) {
235      printUsage(opts);
236      return false;
237    }
238
239    if (cliParser.hasOption("debug")) {
240      debugFlag = true;
241
242    }
243
244    appName = cliParser.getOptionValue("appname", "DistributedShell");
245    amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
246    amQueue = cliParser.getOptionValue("queue", "");
247    amUser = cliParser.getOptionValue("user", "");
248    amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));               
249
250    if (amMemory < 0) {
251      throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
252          + " Specified memory=" + amMemory);
253    }
254
255    if (!cliParser.hasOption("jar")) {
256      throw new IllegalArgumentException("No jar file specified for application master");
257    }           
258
259    appMasterJar = cliParser.getOptionValue("jar");
260    appMasterMainClass = cliParser.getOptionValue("class",
261        "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster");              
262
263    if (!cliParser.hasOption("shell_command")) {
264      throw new IllegalArgumentException("No shell command specified to be executed by application master");
265    }
266    shellCommand = cliParser.getOptionValue("shell_command");
267
268    if (cliParser.hasOption("shell_script")) {
269      shellScriptPath = cliParser.getOptionValue("shell_script");
270    }
271    if (cliParser.hasOption("shell_args")) {
272      shellArgs = cliParser.getOptionValue("shell_args");
273    }
274    if (cliParser.hasOption("shell_env")) { 
275      String envs[] = cliParser.getOptionValues("shell_env");
276      for (String env : envs) {
277        env = env.trim();
278        int index = env.indexOf('=');
279        if (index == -1) {
280          shellEnv.put(env, "");
281          continue;
282        }
283        String key = env.substring(0, index);
284        String val = "";
285        if (index < (env.length()-1)) {
286          val = env.substring(index+1);
287        }
288        shellEnv.put(key, val);
289      }
290    }
291    shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
292
293    containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
294    numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
295
296    if (containerMemory < 0 || numContainers < 1) {
297      throw new IllegalArgumentException("Invalid no. of containers or container memory specified, exiting."
298          + " Specified containerMemory=" + containerMemory
299          + ", numContainer=" + numContainers);
300    }
301
302    clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
303
304    log4jPropFile = cliParser.getOptionValue("log_properties", "");
305
306    return true;
307  }
308
309  /**
310   * Main run function for the client
311   * @return true if application completed successfully
312   * @throws IOException
313   */
314  public boolean run() throws IOException {
315
316    LOG.info("Running Client");
317    start();
318
319    YarnClusterMetrics clusterMetrics = super.getYarnClusterMetrics();
320    LOG.info("Got Cluster metric info from ASM" 
321        + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
322
323    List<NodeReport> clusterNodeReports = super.getNodeReports();
324    LOG.info("Got Cluster node info from ASM");
325    for (NodeReport node : clusterNodeReports) {
326      LOG.info("Got node report from ASM for"
327          + ", nodeId=" + node.getNodeId() 
328          + ", nodeAddress" + node.getHttpAddress()
329          + ", nodeRackName" + node.getRackName()
330          + ", nodeNumContainers" + node.getNumContainers()
331          + ", nodeHealthStatus" + node.getNodeHealthStatus());
332    }
333
334    QueueInfo queueInfo = super.getQueueInfo(this.amQueue);             
335    LOG.info("Queue info"
336        + ", queueName=" + queueInfo.getQueueName()
337        + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
338        + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
339        + ", queueApplicationCount=" + queueInfo.getApplications().size()
340        + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());               
341
342    List<QueueUserACLInfo> listAclInfo = super.getQueueAclsInfo();                                
343    for (QueueUserACLInfo aclInfo : listAclInfo) {
344      for (QueueACL userAcl : aclInfo.getUserAcls()) {
345        LOG.info("User ACL Info for Queue"
346            + ", queueName=" + aclInfo.getQueueName()                   
347            + ", userAcl=" + userAcl.name());
348      }
349    }           
350
351    // Get a new application id 
352    GetNewApplicationResponse newApp = super.getNewApplication();
353    ApplicationId appId = newApp.getApplicationId();
354
355    // TODO get min/max resource capabilities from RM and change memory ask if needed
356    // If we do not have min/max, we may not be able to correctly request 
357    // the required resources from the RM for the app master
358    // Memory ask has to be a multiple of min and less than max. 
359    // Dump out information about cluster capability as seen by the resource manager
360    int minMem = newApp.getMinimumResourceCapability().getMemory();
361    int maxMem = newApp.getMaximumResourceCapability().getMemory();
362    LOG.info("Min mem capabililty of resources in this cluster " + minMem);
363    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
364
365    // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be 
366    // a multiple of the min value and cannot exceed the max. 
367    // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
368    if (amMemory < minMem) {
369      LOG.info("AM memory specified below min threshold of cluster. Using min value."
370          + ", specified=" + amMemory
371          + ", min=" + minMem);
372      amMemory = minMem; 
373    } 
374    else if (amMemory > maxMem) {
375      LOG.info("AM memory specified above max threshold of cluster. Using max value."
376          + ", specified=" + amMemory
377          + ", max=" + maxMem);
378      amMemory = maxMem;
379    }                           
380
381    // Create launch context for app master
382    LOG.info("Setting up application submission context for ASM");
383    ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
384
385    // set the application id 
386    appContext.setApplicationId(appId);
387    // set the application name
388    appContext.setApplicationName(appName);
389
390    // Set up the container launch context for the application master
391    ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
392
393    // set local resources for the application master
394    // local files or archives as needed
395    // In this scenario, the jar file for the application master is part of the local resources                 
396    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
397
398    LOG.info("Copy App Master jar from local filesystem and add to local environment");
399    // Copy the application master jar to the filesystem 
400    // Create a local resource to point to the destination jar path 
401    FileSystem fs = FileSystem.get(conf);
402    Path src = new Path(appMasterJar);
403    String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";           
404    Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
405    fs.copyFromLocalFile(false, true, src, dst);
406    FileStatus destStatus = fs.getFileStatus(dst);
407    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
408
409    // Set the type of resource - file or archive
410    // archives are untarred at destination
411    // we don't need the jar file to be untarred for now
412    amJarRsrc.setType(LocalResourceType.FILE);
413    // Set visibility of the resource 
414    // Setting to most private option
415    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);          
416    // Set the resource to be copied over
417    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); 
418    // Set timestamp and length of file so that the framework 
419    // can do basic sanity checks for the local resource 
420    // after it has been copied over to ensure it is the same 
421    // resource the client intended to use with the application
422    amJarRsrc.setTimestamp(destStatus.getModificationTime());
423    amJarRsrc.setSize(destStatus.getLen());
424    localResources.put("AppMaster.jar",  amJarRsrc);
425
426    // Set the log4j properties if needed 
427    if (!log4jPropFile.isEmpty()) {
428      Path log4jSrc = new Path(log4jPropFile);
429      Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
430      fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
431      FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
432      LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
433      log4jRsrc.setType(LocalResourceType.FILE);
434      log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION);        
435      log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
436      log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
437      log4jRsrc.setSize(log4jFileStatus.getLen());
438      localResources.put("log4j.properties", log4jRsrc);
439    }                   
440
441    // The shell script has to be made available on the final container(s)
442    // where it will be executed. 
443    // To do this, we need to first copy into the filesystem that is visible 
444    // to the yarn framework. 
445    // We do not need to set this as a local resource for the application 
446    // master as the application master does not need it.               
447    String hdfsShellScriptLocation = ""; 
448    long hdfsShellScriptLen = 0;
449    long hdfsShellScriptTimestamp = 0;
450    if (!shellScriptPath.isEmpty()) {
451      Path shellSrc = new Path(shellScriptPath);
452      String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh";
453      Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix);
454      fs.copyFromLocalFile(false, true, shellSrc, shellDst);
455      hdfsShellScriptLocation = shellDst.toUri().toString(); 
456      FileStatus shellFileStatus = fs.getFileStatus(shellDst);
457      hdfsShellScriptLen = shellFileStatus.getLen();
458      hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
459    }
460
461    // Set local resource info into app master container launch context
462    amContainer.setLocalResources(localResources);
463
464    // Set the necessary security tokens as needed
465    //amContainer.setContainerTokens(containerToken);
466
467    // Set the env variables to be setup in the env where the application master will be run
468    LOG.info("Set the environment for the application master");
469    Map<String, String> env = new HashMap<String, String>();
470
471    // put location of shell script into env
472    // using the env info, the application master will create the correct local resource for the 
473    // eventual containers that will be launched to execute the shell scripts
474    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
475    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
476    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
477
478    // Add AppMaster.jar location to classpath          
479    // At some point we should not be required to add 
480    // the hadoop specific classpaths to the env. 
481    // It should be provided out of the box. 
482    // For now setting all required classpaths including
483    // the classpath to "." for the application jar
484    StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
485    for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH)
486        .split(",")) {
487      classPathEnv.append(':');
488      classPathEnv.append(c.trim());
489    }
490    classPathEnv.append(":./log4j.properties");
491
492    // add the runtime classpath needed for tests to work
493    String testRuntimeClassPath = Client.getTestRuntimeClasspath();
494    classPathEnv.append(':');
495    classPathEnv.append(testRuntimeClassPath);
496
497    env.put("CLASSPATH", classPathEnv.toString());
498
499    amContainer.setEnvironment(env);
500
501    // Set the necessary command to execute the application master 
502    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
503
504    // Set java executable command 
505    LOG.info("Setting up app master command");
506    vargs.add("${JAVA_HOME}" + "/bin/java");
507    // Set Xmx based on am memory size
508    vargs.add("-Xmx" + amMemory + "m");
509    // Set class name 
510    vargs.add(appMasterMainClass);
511    // Set params for Application Master
512    vargs.add("--container_memory " + String.valueOf(containerMemory));
513    vargs.add("--num_containers " + String.valueOf(numContainers));
514    vargs.add("--priority " + String.valueOf(shellCmdPriority));
515    if (!shellCommand.isEmpty()) {
516      vargs.add("--shell_command " + shellCommand + "");
517    }
518    if (!shellArgs.isEmpty()) {
519      vargs.add("--shell_args " + shellArgs + "");
520    }
521    for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
522      vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
523    }                   
524    if (debugFlag) {
525      vargs.add("--debug");
526    }
527
528    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
529    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
530
531    // Get final commmand
532    StringBuilder command = new StringBuilder();
533    for (CharSequence str : vargs) {
534      command.append(str).append(" ");
535    }
536
537    LOG.info("Completed setting up app master command " + command.toString());     
538    List<String> commands = new ArrayList<String>();
539    commands.add(command.toString());           
540    amContainer.setCommands(commands);
541
542    // For launching an AM Container, setting user here is not needed
543    // Set user in ApplicationSubmissionContext
544    // amContainer.setUser(amUser);
545
546    // Set up resource type requirements
547    // For now, only memory is supported so we set memory requirements
548    Resource capability = Records.newRecord(Resource.class);
549    capability.setMemory(amMemory);
550    amContainer.setResource(capability);
551
552    // Service data is a binary blob that can be passed to the application
553    // Not needed in this scenario
554    // amContainer.setServiceData(serviceData);
555
556    // The following are not required for launching an application master 
557    // amContainer.setContainerId(containerId);         
558
559    appContext.setAMContainerSpec(amContainer);
560
561    // Set the priority for the application master
562    Priority pri = Records.newRecord(Priority.class);
563    // TODO - what is the range for priority? how to decide? 
564    pri.setPriority(amPriority);
565    appContext.setPriority(pri);
566
567    // Set the queue to which this application is to be submitted in the RM
568    appContext.setQueue(amQueue);
569    // Set the user submitting this application 
570    // TODO can it be empty? 
571    appContext.setUser(amUser);
572
573    // Submit the application to the applications manager
574    // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
575    // Ignore the response as either a valid response object is returned on success 
576    // or an exception thrown to denote some form of a failure
577    LOG.info("Submitting application to ASM");
578    super.submitApplication(appContext);
579
580    // TODO
581    // Try submitting the same request again
582    // app submission failure?
583
584    // Monitor the application
585    return monitorApplication(appId);
586
587  }
588
589  /**
590   * Monitor the submitted application for completion. 
591   * Kill application if time expires. 
592   * @param appId Application Id of application to be monitored
593   * @return true if application completed successfully
594   * @throws YarnRemoteException
595   */
596  private boolean monitorApplication(ApplicationId appId) throws YarnRemoteException {
597
598    while (true) {
599
600      // Check app status every 1 second.
601      try {
602        Thread.sleep(1000);
603      } catch (InterruptedException e) {
604        LOG.debug("Thread sleep in monitoring loop interrupted");
605      }
606
607      // Get application report for the appId we are interested in 
608      ApplicationReport report = super.getApplicationReport(appId);
609
610      LOG.info("Got application report from ASM for"
611          + ", appId=" + appId.getId()
612          + ", clientToken=" + report.getClientToken()
613          + ", appDiagnostics=" + report.getDiagnostics()
614          + ", appMasterHost=" + report.getHost()
615          + ", appQueue=" + report.getQueue()
616          + ", appMasterRpcPort=" + report.getRpcPort()
617          + ", appStartTime=" + report.getStartTime()
618          + ", yarnAppState=" + report.getYarnApplicationState().toString()
619          + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
620          + ", appTrackingUrl=" + report.getTrackingUrl()
621          + ", appUser=" + report.getUser());
622
623      YarnApplicationState state = report.getYarnApplicationState();
624      FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
625      if (YarnApplicationState.FINISHED == state) {
626        if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
627          LOG.info("Application has completed successfully. Breaking monitoring loop");
628          return true;        
629        }
630        else {
631          LOG.info("Application did finished unsuccessfully."
632              + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
633              + ". Breaking monitoring loop");
634          return false;
635        }                         
636      }
637      else if (YarnApplicationState.KILLED == state     
638          || YarnApplicationState.FAILED == state) {
639        LOG.info("Application did not finish."
640            + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
641            + ". Breaking monitoring loop");
642        return false;
643      }                 
644
645      if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
646        LOG.info("Reached client specified timeout for application. Killing application");
647        forceKillApplication(appId);
648        return false;                           
649      }
650    }                   
651
652  }
653
654  /**
655   * Kill a submitted application by sending a call to the ASM
656   * @param appId Application Id to be killed. 
657   * @throws YarnRemoteException
658   */
659  private void forceKillApplication(ApplicationId appId) throws YarnRemoteException {
660    // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 
661    // the same time. 
662    // If yes, can we kill a particular attempt only?
663
664    // Response can be ignored as it is non-null on success or 
665    // throws an exception in case of failures
666    super.killApplication(appId);       
667  }
668
669  private static String getTestRuntimeClasspath() {
670
671    InputStream classpathFileStream = null;
672    BufferedReader reader = null;
673    String envClassPath = "";
674
675    LOG.info("Trying to generate classpath for app master from current thread's classpath");
676    try {
677
678      // Create classpath from generated classpath
679      // Check maven ppom.xml for generated classpath info
680      // Works if compile time env is same as runtime. Mainly tests.
681      ClassLoader thisClassLoader =
682          Thread.currentThread().getContextClassLoader();
683      String generatedClasspathFile = "yarn-apps-ds-generated-classpath";
684      classpathFileStream =
685          thisClassLoader.getResourceAsStream(generatedClasspathFile);
686      if (classpathFileStream == null) {
687        LOG.info("Could not classpath resource from class loader");
688        return envClassPath;
689      }
690      LOG.info("Readable bytes from stream=" + classpathFileStream.available());
691      reader = new BufferedReader(new InputStreamReader(classpathFileStream));
692      String cp = reader.readLine();
693      if (cp != null) {
694        envClassPath += cp.trim() + ":";
695      }
696      // Put the file itself on classpath for tasks.
697      envClassPath += thisClassLoader.getResource(generatedClasspathFile).getFile();
698    } catch (IOException e) {
699      LOG.info("Could not find the necessary resource to generate class path for tests. Error=" + e.getMessage());
700    } 
701
702    try {
703      if (classpathFileStream != null) {
704        classpathFileStream.close();
705      }
706      if (reader != null) {
707        reader.close();
708      }
709    } catch (IOException e) {
710      LOG.info("Failed to close class path file stream or reader. Error=" + e.getMessage());
711    } 
712    return envClassPath;
713  }                     
714
715}