001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.yarn.applications.distributedshell;
020    
021    import java.io.BufferedReader;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.InputStreamReader;
025    import java.util.ArrayList;
026    import java.util.HashMap;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Vector;
030    
031    import org.apache.commons.cli.CommandLine;
032    import org.apache.commons.cli.GnuParser;
033    import org.apache.commons.cli.HelpFormatter;
034    import org.apache.commons.cli.Options;
035    import org.apache.commons.cli.ParseException;
036    import org.apache.commons.logging.Log;
037    import org.apache.commons.logging.LogFactory;
038    import org.apache.hadoop.classification.InterfaceAudience;
039    import org.apache.hadoop.classification.InterfaceStability;
040    import org.apache.hadoop.conf.Configuration;
041    import org.apache.hadoop.fs.FileStatus;
042    import org.apache.hadoop.fs.FileSystem;
043    import org.apache.hadoop.fs.Path;
044    import org.apache.hadoop.yarn.api.ApplicationConstants;
045    import org.apache.hadoop.yarn.api.ClientRMProtocol;
046    import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
047    import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
048    import org.apache.hadoop.yarn.api.records.ApplicationId;
049    import org.apache.hadoop.yarn.api.records.ApplicationReport;
050    import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
051    import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
052    import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
053    import org.apache.hadoop.yarn.api.records.LocalResource;
054    import org.apache.hadoop.yarn.api.records.LocalResourceType;
055    import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
056    import org.apache.hadoop.yarn.api.records.NodeReport;
057    import org.apache.hadoop.yarn.api.records.Priority;
058    import org.apache.hadoop.yarn.api.records.QueueACL;
059    import org.apache.hadoop.yarn.api.records.QueueInfo;
060    import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
061    import org.apache.hadoop.yarn.api.records.Resource;
062    import org.apache.hadoop.yarn.api.records.YarnApplicationState;
063    import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
064    import org.apache.hadoop.yarn.client.YarnClientImpl;
065    import org.apache.hadoop.yarn.conf.YarnConfiguration;
066    import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
067    import org.apache.hadoop.yarn.util.ConverterUtils;
068    import org.apache.hadoop.yarn.util.Records;
069    
070    /**
071     * Client for Distributed Shell application submission to YARN.
072     * 
073     * <p> The distributed shell client allows an application master to be launched that in turn would run 
074     * the provided shell command on a set of containers. </p>
075     * 
076     * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
077     * 
078     * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 
079     * aka ApplicationsManager or ASM via the {@link ClientRMProtocol}. The {@link ClientRMProtocol} 
080     * provides a way for the client to get access to cluster information and to request for a
081     * new {@link ApplicationId}. <p>
082     * 
083     * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 
084     * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 
085     * and application name, the priority assigned to the application and the queue
086     * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
087     * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 
088     * the {@link ApplicationMaster} is launched. </p>
089     * 
090     * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 
091     * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 
092     * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 
093     * {@link ApplicationMaster}. <p>
094     * 
095     * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 
096     * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 
097     * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 
098     * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
099     *
100     */
101    @InterfaceAudience.Public
102    @InterfaceStability.Unstable
103    public class Client extends YarnClientImpl {
104    
105      private static final Log LOG = LogFactory.getLog(Client.class);
106    
107      // Configuration
108      private Configuration conf;
109    
110      // Application master specific info to register a new Application with RM/ASM
111      private String appName = "";
112      // App master priority
113      private int amPriority = 0;
114      // Queue for App master
115      private String amQueue = "";
116      // Amt. of memory resource to request for to run the App Master
117      private int amMemory = 10; 
118    
119      // Application master jar file
120      private String appMasterJar = ""; 
121      // Main class to invoke application master
122      private String appMasterMainClass = "";
123    
124      // Shell command to be executed 
125      private String shellCommand = ""; 
126      // Location of shell script 
127      private String shellScriptPath = ""; 
128      // Args to be passed to the shell command
129      private String shellArgs = "";
130      // Env variables to be setup for the shell command 
131      private Map<String, String> shellEnv = new HashMap<String, String>();
132      // Shell Command Container priority 
133      private int shellCmdPriority = 0;
134    
135      // Amt of memory to request for container in which shell script will be executed
136      private int containerMemory = 10; 
137      // No. of containers in which the shell script needs to be executed
138      private int numContainers = 1;
139    
140      // log4j.properties file 
141      // if available, add to local resources and set into classpath 
142      private String log4jPropFile = "";    
143    
144      // Start time for client
145      private final long clientStartTime = System.currentTimeMillis();
146      // Timeout threshold for client. Kill app after time interval expires.
147      private long clientTimeout = 600000;
148    
149      // Debug flag
150      boolean debugFlag = false;    
151    
152      /**
153       * @param args Command line arguments 
154       */
155      public static void main(String[] args) {
156        boolean result = false;
157        try {
158          Client client = new Client();
159          LOG.info("Initializing Client");
160          boolean doRun = client.init(args);
161          if (!doRun) {
162            System.exit(0);
163          }
164          result = client.run();
165        } catch (Throwable t) {
166          LOG.fatal("Error running CLient", t);
167          System.exit(1);
168        }
169        if (result) {
170          LOG.info("Application completed successfully");
171          System.exit(0);                   
172        } 
173        LOG.error("Application failed to complete successfully");
174        System.exit(2);
175      }
176    
177      /**
178       */
179      public Client(Configuration conf) throws Exception  {
180        super();
181        this.conf = conf;
182        init(conf);
183      }
184    
185      /**
186       */
187      public Client() throws Exception  {
188        this(new Configuration());
189      }
190    
191      /**
192       * Helper function to print out usage
193       * @param opts Parsed command line options 
194       */
195      private void printUsage(Options opts) {
196        new HelpFormatter().printHelp("Client", opts);
197      }
198    
199      /**
200       * Parse command line options
201       * @param args Parsed command line options 
202       * @return Whether the init was successful to run the client
203       * @throws ParseException
204       */
205      public boolean init(String[] args) throws ParseException {
206    
207        Options opts = new Options();
208        opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
209        opts.addOption("priority", true, "Application Priority. Default 0");
210        opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
211        opts.addOption("timeout", true, "Application timeout in milliseconds");
212        opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
213        opts.addOption("jar", true, "Jar file containing the application master");
214        opts.addOption("class", true, "Main class to  be run for the Application Master.");
215        opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
216        opts.addOption("shell_script", true, "Location of the shell script to be executed");
217        opts.addOption("shell_args", true, "Command line args for the shell script");
218        opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
219        opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");            
220        opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
221        opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
222        opts.addOption("log_properties", true, "log4j.properties file");
223        opts.addOption("debug", false, "Dump out debug information");
224        opts.addOption("help", false, "Print usage");
225        CommandLine cliParser = new GnuParser().parse(opts, args);
226    
227        if (args.length == 0) {
228          printUsage(opts);
229          throw new IllegalArgumentException("No args specified for client to initialize");
230        }           
231    
232        if (cliParser.hasOption("help")) {
233          printUsage(opts);
234          return false;
235        }
236    
237        if (cliParser.hasOption("debug")) {
238          debugFlag = true;
239    
240        }
241    
242        appName = cliParser.getOptionValue("appname", "DistributedShell");
243        amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
244        amQueue = cliParser.getOptionValue("queue", "default");
245        amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));               
246    
247        if (amMemory < 0) {
248          throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
249              + " Specified memory=" + amMemory);
250        }
251    
252        if (!cliParser.hasOption("jar")) {
253          throw new IllegalArgumentException("No jar file specified for application master");
254        }           
255    
256        appMasterJar = cliParser.getOptionValue("jar");
257        appMasterMainClass = cliParser.getOptionValue("class",
258            "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster");              
259    
260        if (!cliParser.hasOption("shell_command")) {
261          throw new IllegalArgumentException("No shell command specified to be executed by application master");
262        }
263        shellCommand = cliParser.getOptionValue("shell_command");
264    
265        if (cliParser.hasOption("shell_script")) {
266          shellScriptPath = cliParser.getOptionValue("shell_script");
267        }
268        if (cliParser.hasOption("shell_args")) {
269          shellArgs = cliParser.getOptionValue("shell_args");
270        }
271        if (cliParser.hasOption("shell_env")) { 
272          String envs[] = cliParser.getOptionValues("shell_env");
273          for (String env : envs) {
274            env = env.trim();
275            int index = env.indexOf('=');
276            if (index == -1) {
277              shellEnv.put(env, "");
278              continue;
279            }
280            String key = env.substring(0, index);
281            String val = "";
282            if (index < (env.length()-1)) {
283              val = env.substring(index+1);
284            }
285            shellEnv.put(key, val);
286          }
287        }
288        shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
289    
290        containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
291        numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
292    
293        if (containerMemory < 0 || numContainers < 1) {
294          throw new IllegalArgumentException("Invalid no. of containers or container memory specified, exiting."
295              + " Specified containerMemory=" + containerMemory
296              + ", numContainer=" + numContainers);
297        }
298    
299        clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
300    
301        log4jPropFile = cliParser.getOptionValue("log_properties", "");
302    
303        return true;
304      }
305    
306      /**
307       * Main run function for the client
308       * @return true if application completed successfully
309       * @throws IOException
310       */
311      public boolean run() throws IOException {
312    
313        LOG.info("Running Client");
314        start();
315    
316        YarnClusterMetrics clusterMetrics = super.getYarnClusterMetrics();
317        LOG.info("Got Cluster metric info from ASM" 
318            + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
319    
320        List<NodeReport> clusterNodeReports = super.getNodeReports();
321        LOG.info("Got Cluster node info from ASM");
322        for (NodeReport node : clusterNodeReports) {
323          LOG.info("Got node report from ASM for"
324              + ", nodeId=" + node.getNodeId() 
325              + ", nodeAddress" + node.getHttpAddress()
326              + ", nodeRackName" + node.getRackName()
327              + ", nodeNumContainers" + node.getNumContainers()
328              + ", nodeHealthStatus" + node.getNodeHealthStatus());
329        }
330    
331        QueueInfo queueInfo = super.getQueueInfo(this.amQueue);             
332        LOG.info("Queue info"
333            + ", queueName=" + queueInfo.getQueueName()
334            + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
335            + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
336            + ", queueApplicationCount=" + queueInfo.getApplications().size()
337            + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());               
338    
339        List<QueueUserACLInfo> listAclInfo = super.getQueueAclsInfo();                                
340        for (QueueUserACLInfo aclInfo : listAclInfo) {
341          for (QueueACL userAcl : aclInfo.getUserAcls()) {
342            LOG.info("User ACL Info for Queue"
343                + ", queueName=" + aclInfo.getQueueName()                   
344                + ", userAcl=" + userAcl.name());
345          }
346        }           
347    
348        // Get a new application id 
349        GetNewApplicationResponse newApp = super.getNewApplication();
350        ApplicationId appId = newApp.getApplicationId();
351    
352        // TODO get min/max resource capabilities from RM and change memory ask if needed
353        // If we do not have min/max, we may not be able to correctly request 
354        // the required resources from the RM for the app master
355        // Memory ask has to be a multiple of min and less than max. 
356        // Dump out information about cluster capability as seen by the resource manager
357        int minMem = newApp.getMinimumResourceCapability().getMemory();
358        int maxMem = newApp.getMaximumResourceCapability().getMemory();
359        LOG.info("Min mem capabililty of resources in this cluster " + minMem);
360        LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
361    
362        // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be 
363        // a multiple of the min value and cannot exceed the max. 
364        // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
365        if (amMemory < minMem) {
366          LOG.info("AM memory specified below min threshold of cluster. Using min value."
367              + ", specified=" + amMemory
368              + ", min=" + minMem);
369          amMemory = minMem; 
370        } 
371        else if (amMemory > maxMem) {
372          LOG.info("AM memory specified above max threshold of cluster. Using max value."
373              + ", specified=" + amMemory
374              + ", max=" + maxMem);
375          amMemory = maxMem;
376        }                           
377    
378        // Create launch context for app master
379        LOG.info("Setting up application submission context for ASM");
380        ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
381    
382        // set the application id 
383        appContext.setApplicationId(appId);
384        // set the application name
385        appContext.setApplicationName(appName);
386    
387        // Set up the container launch context for the application master
388        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
389    
390        // set local resources for the application master
391        // local files or archives as needed
392        // In this scenario, the jar file for the application master is part of the local resources                 
393        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
394    
395        LOG.info("Copy App Master jar from local filesystem and add to local environment");
396        // Copy the application master jar to the filesystem 
397        // Create a local resource to point to the destination jar path 
398        FileSystem fs = FileSystem.get(conf);
399        Path src = new Path(appMasterJar);
400        String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";           
401        Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
402        fs.copyFromLocalFile(false, true, src, dst);
403        FileStatus destStatus = fs.getFileStatus(dst);
404        LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
405    
406        // Set the type of resource - file or archive
407        // archives are untarred at destination
408        // we don't need the jar file to be untarred for now
409        amJarRsrc.setType(LocalResourceType.FILE);
410        // Set visibility of the resource 
411        // Setting to most private option
412        amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);          
413        // Set the resource to be copied over
414        amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); 
415        // Set timestamp and length of file so that the framework 
416        // can do basic sanity checks for the local resource 
417        // after it has been copied over to ensure it is the same 
418        // resource the client intended to use with the application
419        amJarRsrc.setTimestamp(destStatus.getModificationTime());
420        amJarRsrc.setSize(destStatus.getLen());
421        localResources.put("AppMaster.jar",  amJarRsrc);
422    
423        // Set the log4j properties if needed 
424        if (!log4jPropFile.isEmpty()) {
425          Path log4jSrc = new Path(log4jPropFile);
426          Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
427          fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
428          FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
429          LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
430          log4jRsrc.setType(LocalResourceType.FILE);
431          log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION);        
432          log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
433          log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
434          log4jRsrc.setSize(log4jFileStatus.getLen());
435          localResources.put("log4j.properties", log4jRsrc);
436        }                   
437    
438        // The shell script has to be made available on the final container(s)
439        // where it will be executed. 
440        // To do this, we need to first copy into the filesystem that is visible 
441        // to the yarn framework. 
442        // We do not need to set this as a local resource for the application 
443        // master as the application master does not need it.               
444        String hdfsShellScriptLocation = ""; 
445        long hdfsShellScriptLen = 0;
446        long hdfsShellScriptTimestamp = 0;
447        if (!shellScriptPath.isEmpty()) {
448          Path shellSrc = new Path(shellScriptPath);
449          String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh";
450          Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix);
451          fs.copyFromLocalFile(false, true, shellSrc, shellDst);
452          hdfsShellScriptLocation = shellDst.toUri().toString(); 
453          FileStatus shellFileStatus = fs.getFileStatus(shellDst);
454          hdfsShellScriptLen = shellFileStatus.getLen();
455          hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
456        }
457    
458        // Set local resource info into app master container launch context
459        amContainer.setLocalResources(localResources);
460    
461        // Set the necessary security tokens as needed
462        //amContainer.setContainerTokens(containerToken);
463    
464        // Set the env variables to be setup in the env where the application master will be run
465        LOG.info("Set the environment for the application master");
466        Map<String, String> env = new HashMap<String, String>();
467    
468        // put location of shell script into env
469        // using the env info, the application master will create the correct local resource for the 
470        // eventual containers that will be launched to execute the shell scripts
471        env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
472        env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
473        env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
474    
475        // Add AppMaster.jar location to classpath          
476        // At some point we should not be required to add 
477        // the hadoop specific classpaths to the env. 
478        // It should be provided out of the box. 
479        // For now setting all required classpaths including
480        // the classpath to "." for the application jar
481        StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
482        for (String c : conf.getStrings(
483            YarnConfiguration.YARN_APPLICATION_CLASSPATH,
484            YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
485          classPathEnv.append(':');
486          classPathEnv.append(c.trim());
487        }
488        classPathEnv.append(":./log4j.properties");
489    
490        // add the runtime classpath needed for tests to work
491        String testRuntimeClassPath = Client.getTestRuntimeClasspath();
492        classPathEnv.append(':');
493        classPathEnv.append(testRuntimeClassPath);
494    
495        env.put("CLASSPATH", classPathEnv.toString());
496    
497        amContainer.setEnvironment(env);
498    
499        // Set the necessary command to execute the application master 
500        Vector<CharSequence> vargs = new Vector<CharSequence>(30);
501    
502        // Set java executable command 
503        LOG.info("Setting up app master command");
504        vargs.add("${JAVA_HOME}" + "/bin/java");
505        // Set Xmx based on am memory size
506        vargs.add("-Xmx" + amMemory + "m");
507        // Set class name 
508        vargs.add(appMasterMainClass);
509        // Set params for Application Master
510        vargs.add("--container_memory " + String.valueOf(containerMemory));
511        vargs.add("--num_containers " + String.valueOf(numContainers));
512        vargs.add("--priority " + String.valueOf(shellCmdPriority));
513        if (!shellCommand.isEmpty()) {
514          vargs.add("--shell_command " + shellCommand + "");
515        }
516        if (!shellArgs.isEmpty()) {
517          vargs.add("--shell_args " + shellArgs + "");
518        }
519        for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
520          vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
521        }                   
522        if (debugFlag) {
523          vargs.add("--debug");
524        }
525    
526        vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
527        vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
528    
529        // Get final commmand
530        StringBuilder command = new StringBuilder();
531        for (CharSequence str : vargs) {
532          command.append(str).append(" ");
533        }
534    
535        LOG.info("Completed setting up app master command " + command.toString());     
536        List<String> commands = new ArrayList<String>();
537        commands.add(command.toString());           
538        amContainer.setCommands(commands);
539    
540        // Set up resource type requirements
541        // For now, only memory is supported so we set memory requirements
542        Resource capability = Records.newRecord(Resource.class);
543        capability.setMemory(amMemory);
544        amContainer.setResource(capability);
545    
546        // Service data is a binary blob that can be passed to the application
547        // Not needed in this scenario
548        // amContainer.setServiceData(serviceData);
549    
550        // The following are not required for launching an application master 
551        // amContainer.setContainerId(containerId);         
552    
553        appContext.setAMContainerSpec(amContainer);
554    
555        // Set the priority for the application master
556        Priority pri = Records.newRecord(Priority.class);
557        // TODO - what is the range for priority? how to decide? 
558        pri.setPriority(amPriority);
559        appContext.setPriority(pri);
560    
561        // Set the queue to which this application is to be submitted in the RM
562        appContext.setQueue(amQueue);
563    
564        // Submit the application to the applications manager
565        // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
566        // Ignore the response as either a valid response object is returned on success 
567        // or an exception thrown to denote some form of a failure
568        LOG.info("Submitting application to ASM");
569        super.submitApplication(appContext);
570    
571        // TODO
572        // Try submitting the same request again
573        // app submission failure?
574    
575        // Monitor the application
576        return monitorApplication(appId);
577    
578      }
579    
580      /**
581       * Monitor the submitted application for completion. 
582       * Kill application if time expires. 
583       * @param appId Application Id of application to be monitored
584       * @return true if application completed successfully
585       * @throws YarnRemoteException
586       */
587      private boolean monitorApplication(ApplicationId appId) throws YarnRemoteException {
588    
589        while (true) {
590    
591          // Check app status every 1 second.
592          try {
593            Thread.sleep(1000);
594          } catch (InterruptedException e) {
595            LOG.debug("Thread sleep in monitoring loop interrupted");
596          }
597    
598          // Get application report for the appId we are interested in 
599          ApplicationReport report = super.getApplicationReport(appId);
600    
601          LOG.info("Got application report from ASM for"
602              + ", appId=" + appId.getId()
603              + ", clientToken=" + report.getClientToken()
604              + ", appDiagnostics=" + report.getDiagnostics()
605              + ", appMasterHost=" + report.getHost()
606              + ", appQueue=" + report.getQueue()
607              + ", appMasterRpcPort=" + report.getRpcPort()
608              + ", appStartTime=" + report.getStartTime()
609              + ", yarnAppState=" + report.getYarnApplicationState().toString()
610              + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
611              + ", appTrackingUrl=" + report.getTrackingUrl()
612              + ", appUser=" + report.getUser());
613    
614          YarnApplicationState state = report.getYarnApplicationState();
615          FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
616          if (YarnApplicationState.FINISHED == state) {
617            if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
618              LOG.info("Application has completed successfully. Breaking monitoring loop");
619              return true;        
620            }
621            else {
622              LOG.info("Application did finished unsuccessfully."
623                  + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
624                  + ". Breaking monitoring loop");
625              return false;
626            }                         
627          }
628          else if (YarnApplicationState.KILLED == state     
629              || YarnApplicationState.FAILED == state) {
630            LOG.info("Application did not finish."
631                + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
632                + ". Breaking monitoring loop");
633            return false;
634          }                 
635    
636          if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
637            LOG.info("Reached client specified timeout for application. Killing application");
638            forceKillApplication(appId);
639            return false;                           
640          }
641        }                   
642    
643      }
644    
645      /**
646       * Kill a submitted application by sending a call to the ASM
647       * @param appId Application Id to be killed. 
648       * @throws YarnRemoteException
649       */
650      private void forceKillApplication(ApplicationId appId) throws YarnRemoteException {
651        // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 
652        // the same time. 
653        // If yes, can we kill a particular attempt only?
654    
655        // Response can be ignored as it is non-null on success or 
656        // throws an exception in case of failures
657        super.killApplication(appId);       
658      }
659    
660      private static String getTestRuntimeClasspath() {
661    
662        InputStream classpathFileStream = null;
663        BufferedReader reader = null;
664        String envClassPath = "";
665    
666        LOG.info("Trying to generate classpath for app master from current thread's classpath");
667        try {
668    
669          // Create classpath from generated classpath
670          // Check maven ppom.xml for generated classpath info
671          // Works if compile time env is same as runtime. Mainly tests.
672          ClassLoader thisClassLoader =
673              Thread.currentThread().getContextClassLoader();
674          String generatedClasspathFile = "yarn-apps-ds-generated-classpath";
675          classpathFileStream =
676              thisClassLoader.getResourceAsStream(generatedClasspathFile);
677          if (classpathFileStream == null) {
678            LOG.info("Could not classpath resource from class loader");
679            return envClassPath;
680          }
681          LOG.info("Readable bytes from stream=" + classpathFileStream.available());
682          reader = new BufferedReader(new InputStreamReader(classpathFileStream));
683          String cp = reader.readLine();
684          if (cp != null) {
685            envClassPath += cp.trim() + ":";
686          }
687          // Put the file itself on classpath for tasks.
688          envClassPath += thisClassLoader.getResource(generatedClasspathFile).getFile();
689        } catch (IOException e) {
690          LOG.info("Could not find the necessary resource to generate class path for tests. Error=" + e.getMessage());
691        } 
692    
693        try {
694          if (classpathFileStream != null) {
695            classpathFileStream.close();
696          }
697          if (reader != null) {
698            reader.close();
699          }
700        } catch (IOException e) {
701          LOG.info("Failed to close class path file stream or reader. Error=" + e.getMessage());
702        } 
703        return envClassPath;
704      }                     
705    
706    }