001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.applications.distributedshell; 020 021import java.io.BufferedReader; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.InputStreamReader; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Vector; 030 031import org.apache.commons.cli.CommandLine; 032import org.apache.commons.cli.GnuParser; 033import org.apache.commons.cli.HelpFormatter; 034import org.apache.commons.cli.Options; 035import org.apache.commons.cli.ParseException; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.apache.hadoop.classification.InterfaceAudience; 039import org.apache.hadoop.classification.InterfaceStability; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileStatus; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.yarn.api.ApplicationConstants; 045import org.apache.hadoop.yarn.api.ClientRMProtocol; 046import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; 047import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; 048import org.apache.hadoop.yarn.api.records.ApplicationId; 049import org.apache.hadoop.yarn.api.records.ApplicationReport; 050import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; 051import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 052import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 053import org.apache.hadoop.yarn.api.records.LocalResource; 054import org.apache.hadoop.yarn.api.records.LocalResourceType; 055import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 056import org.apache.hadoop.yarn.api.records.NodeReport; 057import org.apache.hadoop.yarn.api.records.Priority; 058import org.apache.hadoop.yarn.api.records.QueueACL; 059import org.apache.hadoop.yarn.api.records.QueueInfo; 060import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; 061import org.apache.hadoop.yarn.api.records.Resource; 062import org.apache.hadoop.yarn.api.records.YarnApplicationState; 063import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; 064import org.apache.hadoop.yarn.client.YarnClientImpl; 065import org.apache.hadoop.yarn.conf.YarnConfiguration; 066import org.apache.hadoop.yarn.exceptions.YarnRemoteException; 067import org.apache.hadoop.yarn.util.ConverterUtils; 068import org.apache.hadoop.yarn.util.Records; 069 070/** 071 * Client for Distributed Shell application submission to YARN. 072 * 073 * <p> The distributed shell client allows an application master to be launched that in turn would run 074 * the provided shell command on a set of containers. </p> 075 * 076 * <p>This client is meant to act as an example on how to write yarn-based applications. </p> 077 * 078 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 079 * aka ApplicationsManager or ASM via the {@link ClientRMProtocol}. The {@link ClientRMProtocol} 080 * provides a way for the client to get access to cluster information and to request for a 081 * new {@link ApplicationId}. <p> 082 * 083 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 084 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 085 * and application name, user submitting the application, the priority assigned to the application and the queue 086 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext} 087 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 088 * the {@link ApplicationMaster} is launched. </p> 089 * 090 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 091 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 092 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 093 * {@link ApplicationMaster}. <p> 094 * 095 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 096 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 097 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 098 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p> 099 * 100 */ 101@InterfaceAudience.Public 102@InterfaceStability.Unstable 103public class Client extends YarnClientImpl { 104 105 private static final Log LOG = LogFactory.getLog(Client.class); 106 107 // Configuration 108 private Configuration conf; 109 110 // Application master specific info to register a new Application with RM/ASM 111 private String appName = ""; 112 // App master priority 113 private int amPriority = 0; 114 // Queue for App master 115 private String amQueue = ""; 116 // User to run app master as 117 private String amUser = ""; 118 // Amt. of memory resource to request for to run the App Master 119 private int amMemory = 10; 120 121 // Application master jar file 122 private String appMasterJar = ""; 123 // Main class to invoke application master 124 private String appMasterMainClass = ""; 125 126 // Shell command to be executed 127 private String shellCommand = ""; 128 // Location of shell script 129 private String shellScriptPath = ""; 130 // Args to be passed to the shell command 131 private String shellArgs = ""; 132 // Env variables to be setup for the shell command 133 private Map<String, String> shellEnv = new HashMap<String, String>(); 134 // Shell Command Container priority 135 private int shellCmdPriority = 0; 136 137 // Amt of memory to request for container in which shell script will be executed 138 private int containerMemory = 10; 139 // No. of containers in which the shell script needs to be executed 140 private int numContainers = 1; 141 142 // log4j.properties file 143 // if available, add to local resources and set into classpath 144 private String log4jPropFile = ""; 145 146 // Start time for client 147 private final long clientStartTime = System.currentTimeMillis(); 148 // Timeout threshold for client. Kill app after time interval expires. 149 private long clientTimeout = 600000; 150 151 // Debug flag 152 boolean debugFlag = false; 153 154 /** 155 * @param args Command line arguments 156 */ 157 public static void main(String[] args) { 158 boolean result = false; 159 try { 160 Client client = new Client(); 161 LOG.info("Initializing Client"); 162 boolean doRun = client.init(args); 163 if (!doRun) { 164 System.exit(0); 165 } 166 result = client.run(); 167 } catch (Throwable t) { 168 LOG.fatal("Error running CLient", t); 169 System.exit(1); 170 } 171 if (result) { 172 LOG.info("Application completed successfully"); 173 System.exit(0); 174 } 175 LOG.error("Application failed to complete successfully"); 176 System.exit(2); 177 } 178 179 /** 180 */ 181 public Client(Configuration conf) throws Exception { 182 super(); 183 this.conf = conf; 184 init(conf); 185 } 186 187 /** 188 */ 189 public Client() throws Exception { 190 this(new Configuration()); 191 } 192 193 /** 194 * Helper function to print out usage 195 * @param opts Parsed command line options 196 */ 197 private void printUsage(Options opts) { 198 new HelpFormatter().printHelp("Client", opts); 199 } 200 201 /** 202 * Parse command line options 203 * @param args Parsed command line options 204 * @return Whether the init was successful to run the client 205 */ 206 public boolean init(String[] args) throws ParseException { 207 208 Options opts = new Options(); 209 opts.addOption("appname", true, "Application Name. Default value - DistributedShell"); 210 opts.addOption("priority", true, "Application Priority. Default 0"); 211 opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); 212 opts.addOption("user", true, "User to run the application as"); 213 opts.addOption("timeout", true, "Application timeout in milliseconds"); 214 opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); 215 opts.addOption("jar", true, "Jar file containing the application master"); 216 opts.addOption("class", true, "Main class to be run for the Application Master."); 217 opts.addOption("shell_command", true, "Shell command to be executed by the Application Master"); 218 opts.addOption("shell_script", true, "Location of the shell script to be executed"); 219 opts.addOption("shell_args", true, "Command line args for the shell script"); 220 opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); 221 opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); 222 opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); 223 opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); 224 opts.addOption("log_properties", true, "log4j.properties file"); 225 opts.addOption("debug", false, "Dump out debug information"); 226 opts.addOption("help", false, "Print usage"); 227 CommandLine cliParser = new GnuParser().parse(opts, args); 228 229 if (args.length == 0) { 230 printUsage(opts); 231 throw new IllegalArgumentException("No args specified for client to initialize"); 232 } 233 234 if (cliParser.hasOption("help")) { 235 printUsage(opts); 236 return false; 237 } 238 239 if (cliParser.hasOption("debug")) { 240 debugFlag = true; 241 242 } 243 244 appName = cliParser.getOptionValue("appname", "DistributedShell"); 245 amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); 246 amQueue = cliParser.getOptionValue("queue", ""); 247 amUser = cliParser.getOptionValue("user", ""); 248 amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10")); 249 250 if (amMemory < 0) { 251 throw new IllegalArgumentException("Invalid memory specified for application master, exiting." 252 + " Specified memory=" + amMemory); 253 } 254 255 if (!cliParser.hasOption("jar")) { 256 throw new IllegalArgumentException("No jar file specified for application master"); 257 } 258 259 appMasterJar = cliParser.getOptionValue("jar"); 260 appMasterMainClass = cliParser.getOptionValue("class", 261 "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster"); 262 263 if (!cliParser.hasOption("shell_command")) { 264 throw new IllegalArgumentException("No shell command specified to be executed by application master"); 265 } 266 shellCommand = cliParser.getOptionValue("shell_command"); 267 268 if (cliParser.hasOption("shell_script")) { 269 shellScriptPath = cliParser.getOptionValue("shell_script"); 270 } 271 if (cliParser.hasOption("shell_args")) { 272 shellArgs = cliParser.getOptionValue("shell_args"); 273 } 274 if (cliParser.hasOption("shell_env")) { 275 String envs[] = cliParser.getOptionValues("shell_env"); 276 for (String env : envs) { 277 env = env.trim(); 278 int index = env.indexOf('='); 279 if (index == -1) { 280 shellEnv.put(env, ""); 281 continue; 282 } 283 String key = env.substring(0, index); 284 String val = ""; 285 if (index < (env.length()-1)) { 286 val = env.substring(index+1); 287 } 288 shellEnv.put(key, val); 289 } 290 } 291 shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0")); 292 293 containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); 294 numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); 295 296 if (containerMemory < 0 || numContainers < 1) { 297 throw new IllegalArgumentException("Invalid no. of containers or container memory specified, exiting." 298 + " Specified containerMemory=" + containerMemory 299 + ", numContainer=" + numContainers); 300 } 301 302 clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); 303 304 log4jPropFile = cliParser.getOptionValue("log_properties", ""); 305 306 return true; 307 } 308 309 /** 310 * Main run function for the client 311 * @return true if application completed successfully 312 * @throws IOException 313 */ 314 public boolean run() throws IOException { 315 316 LOG.info("Running Client"); 317 start(); 318 319 YarnClusterMetrics clusterMetrics = super.getYarnClusterMetrics(); 320 LOG.info("Got Cluster metric info from ASM" 321 + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); 322 323 List<NodeReport> clusterNodeReports = super.getNodeReports(); 324 LOG.info("Got Cluster node info from ASM"); 325 for (NodeReport node : clusterNodeReports) { 326 LOG.info("Got node report from ASM for" 327 + ", nodeId=" + node.getNodeId() 328 + ", nodeAddress" + node.getHttpAddress() 329 + ", nodeRackName" + node.getRackName() 330 + ", nodeNumContainers" + node.getNumContainers() 331 + ", nodeHealthStatus" + node.getNodeHealthStatus()); 332 } 333 334 QueueInfo queueInfo = super.getQueueInfo(this.amQueue); 335 LOG.info("Queue info" 336 + ", queueName=" + queueInfo.getQueueName() 337 + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() 338 + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity() 339 + ", queueApplicationCount=" + queueInfo.getApplications().size() 340 + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); 341 342 List<QueueUserACLInfo> listAclInfo = super.getQueueAclsInfo(); 343 for (QueueUserACLInfo aclInfo : listAclInfo) { 344 for (QueueACL userAcl : aclInfo.getUserAcls()) { 345 LOG.info("User ACL Info for Queue" 346 + ", queueName=" + aclInfo.getQueueName() 347 + ", userAcl=" + userAcl.name()); 348 } 349 } 350 351 // Get a new application id 352 GetNewApplicationResponse newApp = super.getNewApplication(); 353 ApplicationId appId = newApp.getApplicationId(); 354 355 // TODO get min/max resource capabilities from RM and change memory ask if needed 356 // If we do not have min/max, we may not be able to correctly request 357 // the required resources from the RM for the app master 358 // Memory ask has to be a multiple of min and less than max. 359 // Dump out information about cluster capability as seen by the resource manager 360 int minMem = newApp.getMinimumResourceCapability().getMemory(); 361 int maxMem = newApp.getMaximumResourceCapability().getMemory(); 362 LOG.info("Min mem capabililty of resources in this cluster " + minMem); 363 LOG.info("Max mem capabililty of resources in this cluster " + maxMem); 364 365 // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be 366 // a multiple of the min value and cannot exceed the max. 367 // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min 368 if (amMemory < minMem) { 369 LOG.info("AM memory specified below min threshold of cluster. Using min value." 370 + ", specified=" + amMemory 371 + ", min=" + minMem); 372 amMemory = minMem; 373 } 374 else if (amMemory > maxMem) { 375 LOG.info("AM memory specified above max threshold of cluster. Using max value." 376 + ", specified=" + amMemory 377 + ", max=" + maxMem); 378 amMemory = maxMem; 379 } 380 381 // Create launch context for app master 382 LOG.info("Setting up application submission context for ASM"); 383 ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); 384 385 // set the application id 386 appContext.setApplicationId(appId); 387 // set the application name 388 appContext.setApplicationName(appName); 389 390 // Set up the container launch context for the application master 391 ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); 392 393 // set local resources for the application master 394 // local files or archives as needed 395 // In this scenario, the jar file for the application master is part of the local resources 396 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 397 398 LOG.info("Copy App Master jar from local filesystem and add to local environment"); 399 // Copy the application master jar to the filesystem 400 // Create a local resource to point to the destination jar path 401 FileSystem fs = FileSystem.get(conf); 402 Path src = new Path(appMasterJar); 403 String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar"; 404 Path dst = new Path(fs.getHomeDirectory(), pathSuffix); 405 fs.copyFromLocalFile(false, true, src, dst); 406 FileStatus destStatus = fs.getFileStatus(dst); 407 LocalResource amJarRsrc = Records.newRecord(LocalResource.class); 408 409 // Set the type of resource - file or archive 410 // archives are untarred at destination 411 // we don't need the jar file to be untarred for now 412 amJarRsrc.setType(LocalResourceType.FILE); 413 // Set visibility of the resource 414 // Setting to most private option 415 amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); 416 // Set the resource to be copied over 417 amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); 418 // Set timestamp and length of file so that the framework 419 // can do basic sanity checks for the local resource 420 // after it has been copied over to ensure it is the same 421 // resource the client intended to use with the application 422 amJarRsrc.setTimestamp(destStatus.getModificationTime()); 423 amJarRsrc.setSize(destStatus.getLen()); 424 localResources.put("AppMaster.jar", amJarRsrc); 425 426 // Set the log4j properties if needed 427 if (!log4jPropFile.isEmpty()) { 428 Path log4jSrc = new Path(log4jPropFile); 429 Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props"); 430 fs.copyFromLocalFile(false, true, log4jSrc, log4jDst); 431 FileStatus log4jFileStatus = fs.getFileStatus(log4jDst); 432 LocalResource log4jRsrc = Records.newRecord(LocalResource.class); 433 log4jRsrc.setType(LocalResourceType.FILE); 434 log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION); 435 log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri())); 436 log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime()); 437 log4jRsrc.setSize(log4jFileStatus.getLen()); 438 localResources.put("log4j.properties", log4jRsrc); 439 } 440 441 // The shell script has to be made available on the final container(s) 442 // where it will be executed. 443 // To do this, we need to first copy into the filesystem that is visible 444 // to the yarn framework. 445 // We do not need to set this as a local resource for the application 446 // master as the application master does not need it. 447 String hdfsShellScriptLocation = ""; 448 long hdfsShellScriptLen = 0; 449 long hdfsShellScriptTimestamp = 0; 450 if (!shellScriptPath.isEmpty()) { 451 Path shellSrc = new Path(shellScriptPath); 452 String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh"; 453 Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix); 454 fs.copyFromLocalFile(false, true, shellSrc, shellDst); 455 hdfsShellScriptLocation = shellDst.toUri().toString(); 456 FileStatus shellFileStatus = fs.getFileStatus(shellDst); 457 hdfsShellScriptLen = shellFileStatus.getLen(); 458 hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); 459 } 460 461 // Set local resource info into app master container launch context 462 amContainer.setLocalResources(localResources); 463 464 // Set the necessary security tokens as needed 465 //amContainer.setContainerTokens(containerToken); 466 467 // Set the env variables to be setup in the env where the application master will be run 468 LOG.info("Set the environment for the application master"); 469 Map<String, String> env = new HashMap<String, String>(); 470 471 // put location of shell script into env 472 // using the env info, the application master will create the correct local resource for the 473 // eventual containers that will be launched to execute the shell scripts 474 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); 475 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); 476 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); 477 478 // Add AppMaster.jar location to classpath 479 // At some point we should not be required to add 480 // the hadoop specific classpaths to the env. 481 // It should be provided out of the box. 482 // For now setting all required classpaths including 483 // the classpath to "." for the application jar 484 StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*"); 485 for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH) 486 .split(",")) { 487 classPathEnv.append(':'); 488 classPathEnv.append(c.trim()); 489 } 490 classPathEnv.append(":./log4j.properties"); 491 492 // add the runtime classpath needed for tests to work 493 String testRuntimeClassPath = Client.getTestRuntimeClasspath(); 494 classPathEnv.append(':'); 495 classPathEnv.append(testRuntimeClassPath); 496 497 env.put("CLASSPATH", classPathEnv.toString()); 498 499 amContainer.setEnvironment(env); 500 501 // Set the necessary command to execute the application master 502 Vector<CharSequence> vargs = new Vector<CharSequence>(30); 503 504 // Set java executable command 505 LOG.info("Setting up app master command"); 506 vargs.add("${JAVA_HOME}" + "/bin/java"); 507 // Set Xmx based on am memory size 508 vargs.add("-Xmx" + amMemory + "m"); 509 // Set class name 510 vargs.add(appMasterMainClass); 511 // Set params for Application Master 512 vargs.add("--container_memory " + String.valueOf(containerMemory)); 513 vargs.add("--num_containers " + String.valueOf(numContainers)); 514 vargs.add("--priority " + String.valueOf(shellCmdPriority)); 515 if (!shellCommand.isEmpty()) { 516 vargs.add("--shell_command " + shellCommand + ""); 517 } 518 if (!shellArgs.isEmpty()) { 519 vargs.add("--shell_args " + shellArgs + ""); 520 } 521 for (Map.Entry<String, String> entry : shellEnv.entrySet()) { 522 vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); 523 } 524 if (debugFlag) { 525 vargs.add("--debug"); 526 } 527 528 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); 529 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); 530 531 // Get final commmand 532 StringBuilder command = new StringBuilder(); 533 for (CharSequence str : vargs) { 534 command.append(str).append(" "); 535 } 536 537 LOG.info("Completed setting up app master command " + command.toString()); 538 List<String> commands = new ArrayList<String>(); 539 commands.add(command.toString()); 540 amContainer.setCommands(commands); 541 542 // For launching an AM Container, setting user here is not needed 543 // Set user in ApplicationSubmissionContext 544 // amContainer.setUser(amUser); 545 546 // Set up resource type requirements 547 // For now, only memory is supported so we set memory requirements 548 Resource capability = Records.newRecord(Resource.class); 549 capability.setMemory(amMemory); 550 amContainer.setResource(capability); 551 552 // Service data is a binary blob that can be passed to the application 553 // Not needed in this scenario 554 // amContainer.setServiceData(serviceData); 555 556 // The following are not required for launching an application master 557 // amContainer.setContainerId(containerId); 558 559 appContext.setAMContainerSpec(amContainer); 560 561 // Set the priority for the application master 562 Priority pri = Records.newRecord(Priority.class); 563 // TODO - what is the range for priority? how to decide? 564 pri.setPriority(amPriority); 565 appContext.setPriority(pri); 566 567 // Set the queue to which this application is to be submitted in the RM 568 appContext.setQueue(amQueue); 569 // Set the user submitting this application 570 // TODO can it be empty? 571 appContext.setUser(amUser); 572 573 // Submit the application to the applications manager 574 // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); 575 // Ignore the response as either a valid response object is returned on success 576 // or an exception thrown to denote some form of a failure 577 LOG.info("Submitting application to ASM"); 578 super.submitApplication(appContext); 579 580 // TODO 581 // Try submitting the same request again 582 // app submission failure? 583 584 // Monitor the application 585 return monitorApplication(appId); 586 587 } 588 589 /** 590 * Monitor the submitted application for completion. 591 * Kill application if time expires. 592 * @param appId Application Id of application to be monitored 593 * @return true if application completed successfully 594 * @throws YarnRemoteException 595 */ 596 private boolean monitorApplication(ApplicationId appId) throws YarnRemoteException { 597 598 while (true) { 599 600 // Check app status every 1 second. 601 try { 602 Thread.sleep(1000); 603 } catch (InterruptedException e) { 604 LOG.debug("Thread sleep in monitoring loop interrupted"); 605 } 606 607 // Get application report for the appId we are interested in 608 ApplicationReport report = super.getApplicationReport(appId); 609 610 LOG.info("Got application report from ASM for" 611 + ", appId=" + appId.getId() 612 + ", clientToken=" + report.getClientToken() 613 + ", appDiagnostics=" + report.getDiagnostics() 614 + ", appMasterHost=" + report.getHost() 615 + ", appQueue=" + report.getQueue() 616 + ", appMasterRpcPort=" + report.getRpcPort() 617 + ", appStartTime=" + report.getStartTime() 618 + ", yarnAppState=" + report.getYarnApplicationState().toString() 619 + ", distributedFinalState=" + report.getFinalApplicationStatus().toString() 620 + ", appTrackingUrl=" + report.getTrackingUrl() 621 + ", appUser=" + report.getUser()); 622 623 YarnApplicationState state = report.getYarnApplicationState(); 624 FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); 625 if (YarnApplicationState.FINISHED == state) { 626 if (FinalApplicationStatus.SUCCEEDED == dsStatus) { 627 LOG.info("Application has completed successfully. Breaking monitoring loop"); 628 return true; 629 } 630 else { 631 LOG.info("Application did finished unsuccessfully." 632 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() 633 + ". Breaking monitoring loop"); 634 return false; 635 } 636 } 637 else if (YarnApplicationState.KILLED == state 638 || YarnApplicationState.FAILED == state) { 639 LOG.info("Application did not finish." 640 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() 641 + ". Breaking monitoring loop"); 642 return false; 643 } 644 645 if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { 646 LOG.info("Reached client specified timeout for application. Killing application"); 647 forceKillApplication(appId); 648 return false; 649 } 650 } 651 652 } 653 654 /** 655 * Kill a submitted application by sending a call to the ASM 656 * @param appId Application Id to be killed. 657 * @throws YarnRemoteException 658 */ 659 private void forceKillApplication(ApplicationId appId) throws YarnRemoteException { 660 // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 661 // the same time. 662 // If yes, can we kill a particular attempt only? 663 664 // Response can be ignored as it is non-null on success or 665 // throws an exception in case of failures 666 super.killApplication(appId); 667 } 668 669 private static String getTestRuntimeClasspath() { 670 671 InputStream classpathFileStream = null; 672 BufferedReader reader = null; 673 String envClassPath = ""; 674 675 LOG.info("Trying to generate classpath for app master from current thread's classpath"); 676 try { 677 678 // Create classpath from generated classpath 679 // Check maven ppom.xml for generated classpath info 680 // Works if compile time env is same as runtime. Mainly tests. 681 ClassLoader thisClassLoader = 682 Thread.currentThread().getContextClassLoader(); 683 String generatedClasspathFile = "yarn-apps-ds-generated-classpath"; 684 classpathFileStream = 685 thisClassLoader.getResourceAsStream(generatedClasspathFile); 686 if (classpathFileStream == null) { 687 LOG.info("Could not classpath resource from class loader"); 688 return envClassPath; 689 } 690 LOG.info("Readable bytes from stream=" + classpathFileStream.available()); 691 reader = new BufferedReader(new InputStreamReader(classpathFileStream)); 692 String cp = reader.readLine(); 693 if (cp != null) { 694 envClassPath += cp.trim() + ":"; 695 } 696 // Put the file itself on classpath for tasks. 697 envClassPath += thisClassLoader.getResource(generatedClasspathFile).getFile(); 698 } catch (IOException e) { 699 LOG.info("Could not find the necessary resource to generate class path for tests. Error=" + e.getMessage()); 700 } 701 702 try { 703 if (classpathFileStream != null) { 704 classpathFileStream.close(); 705 } 706 if (reader != null) { 707 reader.close(); 708 } 709 } catch (IOException e) { 710 LOG.info("Failed to close class path file stream or reader. Error=" + e.getMessage()); 711 } 712 return envClassPath; 713 } 714 715}