001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.applications.distributedshell; 020 021 import java.io.IOException; 022 import java.nio.ByteBuffer; 023 import java.util.ArrayList; 024 import java.util.HashMap; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.Vector; 028 029 import org.apache.commons.cli.CommandLine; 030 import org.apache.commons.cli.GnuParser; 031 import org.apache.commons.cli.HelpFormatter; 032 import org.apache.commons.cli.Option; 033 import org.apache.commons.cli.Options; 034 import org.apache.commons.cli.ParseException; 035 import org.apache.commons.io.IOUtils; 036 import org.apache.commons.lang.StringUtils; 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 import org.apache.hadoop.classification.InterfaceAudience; 040 import org.apache.hadoop.classification.InterfaceStability; 041 import org.apache.hadoop.conf.Configuration; 042 import org.apache.hadoop.fs.FSDataOutputStream; 043 import org.apache.hadoop.fs.FileStatus; 044 import org.apache.hadoop.fs.FileSystem; 045 import org.apache.hadoop.fs.Path; 046 import org.apache.hadoop.fs.permission.FsPermission; 047 import org.apache.hadoop.io.DataOutputBuffer; 048 import org.apache.hadoop.security.Credentials; 049 import org.apache.hadoop.security.UserGroupInformation; 050 import org.apache.hadoop.security.token.Token; 051 import org.apache.hadoop.yarn.api.ApplicationClientProtocol; 052 import org.apache.hadoop.yarn.api.ApplicationConstants; 053 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 054 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; 055 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; 056 import org.apache.hadoop.yarn.api.records.ApplicationId; 057 import org.apache.hadoop.yarn.api.records.ApplicationReport; 058 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; 059 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 060 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 061 import org.apache.hadoop.yarn.api.records.LocalResource; 062 import org.apache.hadoop.yarn.api.records.LocalResourceType; 063 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 064 import org.apache.hadoop.yarn.api.records.NodeReport; 065 import org.apache.hadoop.yarn.api.records.NodeState; 066 import org.apache.hadoop.yarn.api.records.Priority; 067 import org.apache.hadoop.yarn.api.records.QueueACL; 068 import org.apache.hadoop.yarn.api.records.QueueInfo; 069 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; 070 import org.apache.hadoop.yarn.api.records.Resource; 071 import org.apache.hadoop.yarn.api.records.YarnApplicationState; 072 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; 073 import org.apache.hadoop.yarn.client.api.YarnClient; 074 import org.apache.hadoop.yarn.client.api.YarnClientApplication; 075 import org.apache.hadoop.yarn.conf.YarnConfiguration; 076 import org.apache.hadoop.yarn.exceptions.YarnException; 077 import org.apache.hadoop.yarn.util.ConverterUtils; 078 import org.apache.hadoop.yarn.util.Records; 079 080 /** 081 * Client for Distributed Shell application submission to YARN. 082 * 083 * <p> The distributed shell client allows an application master to be launched that in turn would run 084 * the provided shell command on a set of containers. </p> 085 * 086 * <p>This client is meant to act as an example on how to write yarn-based applications. </p> 087 * 088 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 089 * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} 090 * provides a way for the client to get access to cluster information and to request for a 091 * new {@link ApplicationId}. <p> 092 * 093 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 094 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 095 * and application name, the priority assigned to the application and the queue 096 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext} 097 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 098 * the {@link ApplicationMaster} is launched. </p> 099 * 100 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 101 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 102 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 103 * {@link ApplicationMaster}. <p> 104 * 105 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 106 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 107 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 108 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p> 109 * 110 */ 111 @InterfaceAudience.Public 112 @InterfaceStability.Unstable 113 public class Client { 114 115 private static final Log LOG = LogFactory.getLog(Client.class); 116 117 // Configuration 118 private Configuration conf; 119 private YarnClient yarnClient; 120 // Application master specific info to register a new Application with RM/ASM 121 private String appName = ""; 122 // App master priority 123 private int amPriority = 0; 124 // Queue for App master 125 private String amQueue = ""; 126 // Amt. of memory resource to request for to run the App Master 127 private int amMemory = 10; 128 // Amt. of virtual core resource to request for to run the App Master 129 private int amVCores = 1; 130 131 // Application master jar file 132 private String appMasterJar = ""; 133 // Main class to invoke application master 134 private final String appMasterMainClass; 135 136 // Shell command to be executed 137 private String shellCommand = ""; 138 // Location of shell script 139 private String shellScriptPath = ""; 140 // Args to be passed to the shell command 141 private String[] shellArgs = new String[] {}; 142 // Env variables to be setup for the shell command 143 private Map<String, String> shellEnv = new HashMap<String, String>(); 144 // Shell Command Container priority 145 private int shellCmdPriority = 0; 146 147 // Amt of memory to request for container in which shell script will be executed 148 private int containerMemory = 10; 149 // Amt. of virtual cores to request for container in which shell script will be executed 150 private int containerVirtualCores = 1; 151 // No. of containers in which the shell script needs to be executed 152 private int numContainers = 1; 153 154 // log4j.properties file 155 // if available, add to local resources and set into classpath 156 private String log4jPropFile = ""; 157 158 // Start time for client 159 private final long clientStartTime = System.currentTimeMillis(); 160 // Timeout threshold for client. Kill app after time interval expires. 161 private long clientTimeout = 600000; 162 163 // flag to indicate whether to keep containers across application attempts. 164 private boolean keepContainers = false; 165 166 // Debug flag 167 boolean debugFlag = false; 168 169 // Command line options 170 private Options opts; 171 172 private static final String shellCommandPath = "shellCommands"; 173 private static final String shellArgsPath = "shellArgs"; 174 private static final String appMasterJarPath = "AppMaster.jar"; 175 // Hardcoded path to custom log_properties 176 private static final String log4jPath = "log4j.properties"; 177 178 public static final String SCRIPT_PATH = "ExecScript"; 179 180 /** 181 * @param args Command line arguments 182 */ 183 public static void main(String[] args) { 184 boolean result = false; 185 try { 186 Client client = new Client(); 187 LOG.info("Initializing Client"); 188 try { 189 boolean doRun = client.init(args); 190 if (!doRun) { 191 System.exit(0); 192 } 193 } catch (IllegalArgumentException e) { 194 System.err.println(e.getLocalizedMessage()); 195 client.printUsage(); 196 System.exit(-1); 197 } 198 result = client.run(); 199 } catch (Throwable t) { 200 LOG.fatal("Error running CLient", t); 201 System.exit(1); 202 } 203 if (result) { 204 LOG.info("Application completed successfully"); 205 System.exit(0); 206 } 207 LOG.error("Application failed to complete successfully"); 208 System.exit(2); 209 } 210 211 /** 212 */ 213 public Client(Configuration conf) throws Exception { 214 this( 215 "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster", 216 conf); 217 } 218 219 Client(String appMasterMainClass, Configuration conf) { 220 this.conf = conf; 221 this.appMasterMainClass = appMasterMainClass; 222 yarnClient = YarnClient.createYarnClient(); 223 yarnClient.init(conf); 224 opts = new Options(); 225 opts.addOption("appname", true, "Application Name. Default value - DistributedShell"); 226 opts.addOption("priority", true, "Application Priority. Default 0"); 227 opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); 228 opts.addOption("timeout", true, "Application timeout in milliseconds"); 229 opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); 230 opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master"); 231 opts.addOption("jar", true, "Jar file containing the application master"); 232 opts.addOption("shell_command", true, "Shell command to be executed by " + 233 "the Application Master. Can only specify either --shell_command " + 234 "or --shell_script"); 235 opts.addOption("shell_script", true, "Location of the shell script to be " + 236 "executed. Can only specify either --shell_command or --shell_script"); 237 opts.addOption("shell_args", true, "Command line args for the shell script." + 238 "Multiple args can be separated by empty space."); 239 opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES); 240 opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); 241 opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); 242 opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); 243 opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); 244 opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); 245 opts.addOption("log_properties", true, "log4j.properties file"); 246 opts.addOption("keep_containers_across_application_attempts", false, 247 "Flag to indicate whether to keep containers across application attempts." + 248 " If the flag is true, running containers will not be killed when" + 249 " application attempt fails and these containers will be retrieved by" + 250 " the new application attempt "); 251 opts.addOption("debug", false, "Dump out debug information"); 252 opts.addOption("help", false, "Print usage"); 253 254 } 255 256 /** 257 */ 258 public Client() throws Exception { 259 this(new YarnConfiguration()); 260 } 261 262 /** 263 * Helper function to print out usage 264 */ 265 private void printUsage() { 266 new HelpFormatter().printHelp("Client", opts); 267 } 268 269 /** 270 * Parse command line options 271 * @param args Parsed command line options 272 * @return Whether the init was successful to run the client 273 * @throws ParseException 274 */ 275 public boolean init(String[] args) throws ParseException { 276 277 CommandLine cliParser = new GnuParser().parse(opts, args); 278 279 if (args.length == 0) { 280 throw new IllegalArgumentException("No args specified for client to initialize"); 281 } 282 283 if (cliParser.hasOption("log_properties")) { 284 String log4jPath = cliParser.getOptionValue("log_properties"); 285 try { 286 Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath); 287 } catch (Exception e) { 288 LOG.warn("Can not set up custom log4j properties. " + e); 289 } 290 } 291 292 if (cliParser.hasOption("help")) { 293 printUsage(); 294 return false; 295 } 296 297 if (cliParser.hasOption("debug")) { 298 debugFlag = true; 299 300 } 301 302 if (cliParser.hasOption("keep_containers_across_application_attempts")) { 303 LOG.info("keep_containers_across_application_attempts"); 304 keepContainers = true; 305 } 306 307 appName = cliParser.getOptionValue("appname", "DistributedShell"); 308 amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); 309 amQueue = cliParser.getOptionValue("queue", "default"); 310 amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10")); 311 amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1")); 312 313 if (amMemory < 0) { 314 throw new IllegalArgumentException("Invalid memory specified for application master, exiting." 315 + " Specified memory=" + amMemory); 316 } 317 if (amVCores < 0) { 318 throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting." 319 + " Specified virtual cores=" + amVCores); 320 } 321 322 if (!cliParser.hasOption("jar")) { 323 throw new IllegalArgumentException("No jar file specified for application master"); 324 } 325 326 appMasterJar = cliParser.getOptionValue("jar"); 327 328 if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) { 329 throw new IllegalArgumentException( 330 "No shell command or shell script specified to be executed by application master"); 331 } else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) { 332 throw new IllegalArgumentException("Can not specify shell_command option " + 333 "and shell_script option at the same time"); 334 } else if (cliParser.hasOption("shell_command")) { 335 shellCommand = cliParser.getOptionValue("shell_command"); 336 } else { 337 shellScriptPath = cliParser.getOptionValue("shell_script"); 338 } 339 if (cliParser.hasOption("shell_args")) { 340 shellArgs = cliParser.getOptionValues("shell_args"); 341 } 342 if (cliParser.hasOption("shell_env")) { 343 String envs[] = cliParser.getOptionValues("shell_env"); 344 for (String env : envs) { 345 env = env.trim(); 346 int index = env.indexOf('='); 347 if (index == -1) { 348 shellEnv.put(env, ""); 349 continue; 350 } 351 String key = env.substring(0, index); 352 String val = ""; 353 if (index < (env.length()-1)) { 354 val = env.substring(index+1); 355 } 356 shellEnv.put(key, val); 357 } 358 } 359 shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0")); 360 361 containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); 362 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); 363 numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); 364 365 if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { 366 throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," 367 + " exiting." 368 + " Specified containerMemory=" + containerMemory 369 + ", containerVirtualCores=" + containerVirtualCores 370 + ", numContainer=" + numContainers); 371 } 372 373 clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); 374 375 log4jPropFile = cliParser.getOptionValue("log_properties", ""); 376 377 return true; 378 } 379 380 /** 381 * Main run function for the client 382 * @return true if application completed successfully 383 * @throws IOException 384 * @throws YarnException 385 */ 386 public boolean run() throws IOException, YarnException { 387 388 LOG.info("Running Client"); 389 yarnClient.start(); 390 391 YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); 392 LOG.info("Got Cluster metric info from ASM" 393 + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); 394 395 List<NodeReport> clusterNodeReports = yarnClient.getNodeReports( 396 NodeState.RUNNING); 397 LOG.info("Got Cluster node info from ASM"); 398 for (NodeReport node : clusterNodeReports) { 399 LOG.info("Got node report from ASM for" 400 + ", nodeId=" + node.getNodeId() 401 + ", nodeAddress" + node.getHttpAddress() 402 + ", nodeRackName" + node.getRackName() 403 + ", nodeNumContainers" + node.getNumContainers()); 404 } 405 406 QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue); 407 LOG.info("Queue info" 408 + ", queueName=" + queueInfo.getQueueName() 409 + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() 410 + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity() 411 + ", queueApplicationCount=" + queueInfo.getApplications().size() 412 + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); 413 414 List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo(); 415 for (QueueUserACLInfo aclInfo : listAclInfo) { 416 for (QueueACL userAcl : aclInfo.getUserAcls()) { 417 LOG.info("User ACL Info for Queue" 418 + ", queueName=" + aclInfo.getQueueName() 419 + ", userAcl=" + userAcl.name()); 420 } 421 } 422 423 // Get a new application id 424 YarnClientApplication app = yarnClient.createApplication(); 425 GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); 426 // TODO get min/max resource capabilities from RM and change memory ask if needed 427 // If we do not have min/max, we may not be able to correctly request 428 // the required resources from the RM for the app master 429 // Memory ask has to be a multiple of min and less than max. 430 // Dump out information about cluster capability as seen by the resource manager 431 int maxMem = appResponse.getMaximumResourceCapability().getMemory(); 432 LOG.info("Max mem capabililty of resources in this cluster " + maxMem); 433 434 // A resource ask cannot exceed the max. 435 if (amMemory > maxMem) { 436 LOG.info("AM memory specified above max threshold of cluster. Using max value." 437 + ", specified=" + amMemory 438 + ", max=" + maxMem); 439 amMemory = maxMem; 440 } 441 442 int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores(); 443 LOG.info("Max virtual cores capabililty of resources in this cluster " + maxVCores); 444 445 if (amVCores > maxVCores) { 446 LOG.info("AM virtual cores specified above max threshold of cluster. " 447 + "Using max value." + ", specified=" + amVCores 448 + ", max=" + maxVCores); 449 amVCores = maxVCores; 450 } 451 452 // set the application name 453 ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); 454 ApplicationId appId = appContext.getApplicationId(); 455 456 appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); 457 appContext.setApplicationName(appName); 458 459 // Set up the container launch context for the application master 460 ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); 461 462 // set local resources for the application master 463 // local files or archives as needed 464 // In this scenario, the jar file for the application master is part of the local resources 465 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 466 467 LOG.info("Copy App Master jar from local filesystem and add to local environment"); 468 // Copy the application master jar to the filesystem 469 // Create a local resource to point to the destination jar path 470 FileSystem fs = FileSystem.get(conf); 471 addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), 472 localResources, null); 473 474 // Set the log4j properties if needed 475 if (!log4jPropFile.isEmpty()) { 476 addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(), 477 localResources, null); 478 } 479 480 // The shell script has to be made available on the final container(s) 481 // where it will be executed. 482 // To do this, we need to first copy into the filesystem that is visible 483 // to the yarn framework. 484 // We do not need to set this as a local resource for the application 485 // master as the application master does not need it. 486 String hdfsShellScriptLocation = ""; 487 long hdfsShellScriptLen = 0; 488 long hdfsShellScriptTimestamp = 0; 489 if (!shellScriptPath.isEmpty()) { 490 Path shellSrc = new Path(shellScriptPath); 491 String shellPathSuffix = 492 appName + "/" + appId.toString() + "/" + SCRIPT_PATH; 493 Path shellDst = 494 new Path(fs.getHomeDirectory(), shellPathSuffix); 495 fs.copyFromLocalFile(false, true, shellSrc, shellDst); 496 hdfsShellScriptLocation = shellDst.toUri().toString(); 497 FileStatus shellFileStatus = fs.getFileStatus(shellDst); 498 hdfsShellScriptLen = shellFileStatus.getLen(); 499 hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); 500 } 501 502 if (!shellCommand.isEmpty()) { 503 addToLocalResources(fs, null, shellCommandPath, appId.toString(), 504 localResources, shellCommand); 505 } 506 507 if (shellArgs.length > 0) { 508 addToLocalResources(fs, null, shellArgsPath, appId.toString(), 509 localResources, StringUtils.join(shellArgs, " ")); 510 } 511 // Set local resource info into app master container launch context 512 amContainer.setLocalResources(localResources); 513 514 // Set the necessary security tokens as needed 515 //amContainer.setContainerTokens(containerToken); 516 517 // Set the env variables to be setup in the env where the application master will be run 518 LOG.info("Set the environment for the application master"); 519 Map<String, String> env = new HashMap<String, String>(); 520 521 // put location of shell script into env 522 // using the env info, the application master will create the correct local resource for the 523 // eventual containers that will be launched to execute the shell scripts 524 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); 525 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); 526 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); 527 528 // Add AppMaster.jar location to classpath 529 // At some point we should not be required to add 530 // the hadoop specific classpaths to the env. 531 // It should be provided out of the box. 532 // For now setting all required classpaths including 533 // the classpath to "." for the application jar 534 StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$()) 535 .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*"); 536 for (String c : conf.getStrings( 537 YarnConfiguration.YARN_APPLICATION_CLASSPATH, 538 YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) { 539 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR); 540 classPathEnv.append(c.trim()); 541 } 542 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append( 543 "./log4j.properties"); 544 545 // add the runtime classpath needed for tests to work 546 if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { 547 classPathEnv.append(':'); 548 classPathEnv.append(System.getProperty("java.class.path")); 549 } 550 551 env.put("CLASSPATH", classPathEnv.toString()); 552 553 amContainer.setEnvironment(env); 554 555 // Set the necessary command to execute the application master 556 Vector<CharSequence> vargs = new Vector<CharSequence>(30); 557 558 // Set java executable command 559 LOG.info("Setting up app master command"); 560 vargs.add(Environment.JAVA_HOME.$$() + "/bin/java"); 561 // Set Xmx based on am memory size 562 vargs.add("-Xmx" + amMemory + "m"); 563 // Set class name 564 vargs.add(appMasterMainClass); 565 // Set params for Application Master 566 vargs.add("--container_memory " + String.valueOf(containerMemory)); 567 vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); 568 vargs.add("--num_containers " + String.valueOf(numContainers)); 569 vargs.add("--priority " + String.valueOf(shellCmdPriority)); 570 571 for (Map.Entry<String, String> entry : shellEnv.entrySet()) { 572 vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); 573 } 574 if (debugFlag) { 575 vargs.add("--debug"); 576 } 577 578 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); 579 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); 580 581 // Get final commmand 582 StringBuilder command = new StringBuilder(); 583 for (CharSequence str : vargs) { 584 command.append(str).append(" "); 585 } 586 587 LOG.info("Completed setting up app master command " + command.toString()); 588 List<String> commands = new ArrayList<String>(); 589 commands.add(command.toString()); 590 amContainer.setCommands(commands); 591 592 // Set up resource type requirements 593 // For now, both memory and vcores are supported, so we set memory and 594 // vcores requirements 595 Resource capability = Records.newRecord(Resource.class); 596 capability.setMemory(amMemory); 597 capability.setVirtualCores(amVCores); 598 appContext.setResource(capability); 599 600 // Service data is a binary blob that can be passed to the application 601 // Not needed in this scenario 602 // amContainer.setServiceData(serviceData); 603 604 // Setup security tokens 605 if (UserGroupInformation.isSecurityEnabled()) { 606 Credentials credentials = new Credentials(); 607 String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); 608 if (tokenRenewer == null || tokenRenewer.length() == 0) { 609 throw new IOException( 610 "Can't get Master Kerberos principal for the RM to use as renewer"); 611 } 612 613 // For now, only getting tokens for the default file-system. 614 final Token<?> tokens[] = 615 fs.addDelegationTokens(tokenRenewer, credentials); 616 if (tokens != null) { 617 for (Token<?> token : tokens) { 618 LOG.info("Got dt for " + fs.getUri() + "; " + token); 619 } 620 } 621 DataOutputBuffer dob = new DataOutputBuffer(); 622 credentials.writeTokenStorageToStream(dob); 623 ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 624 amContainer.setTokens(fsTokens); 625 } 626 627 appContext.setAMContainerSpec(amContainer); 628 629 // Set the priority for the application master 630 Priority pri = Records.newRecord(Priority.class); 631 // TODO - what is the range for priority? how to decide? 632 pri.setPriority(amPriority); 633 appContext.setPriority(pri); 634 635 // Set the queue to which this application is to be submitted in the RM 636 appContext.setQueue(amQueue); 637 638 // Submit the application to the applications manager 639 // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); 640 // Ignore the response as either a valid response object is returned on success 641 // or an exception thrown to denote some form of a failure 642 LOG.info("Submitting application to ASM"); 643 644 yarnClient.submitApplication(appContext); 645 646 // TODO 647 // Try submitting the same request again 648 // app submission failure? 649 650 // Monitor the application 651 return monitorApplication(appId); 652 653 } 654 655 /** 656 * Monitor the submitted application for completion. 657 * Kill application if time expires. 658 * @param appId Application Id of application to be monitored 659 * @return true if application completed successfully 660 * @throws YarnException 661 * @throws IOException 662 */ 663 private boolean monitorApplication(ApplicationId appId) 664 throws YarnException, IOException { 665 666 while (true) { 667 668 // Check app status every 1 second. 669 try { 670 Thread.sleep(1000); 671 } catch (InterruptedException e) { 672 LOG.debug("Thread sleep in monitoring loop interrupted"); 673 } 674 675 // Get application report for the appId we are interested in 676 ApplicationReport report = yarnClient.getApplicationReport(appId); 677 678 LOG.info("Got application report from ASM for" 679 + ", appId=" + appId.getId() 680 + ", clientToAMToken=" + report.getClientToAMToken() 681 + ", appDiagnostics=" + report.getDiagnostics() 682 + ", appMasterHost=" + report.getHost() 683 + ", appQueue=" + report.getQueue() 684 + ", appMasterRpcPort=" + report.getRpcPort() 685 + ", appStartTime=" + report.getStartTime() 686 + ", yarnAppState=" + report.getYarnApplicationState().toString() 687 + ", distributedFinalState=" + report.getFinalApplicationStatus().toString() 688 + ", appTrackingUrl=" + report.getTrackingUrl() 689 + ", appUser=" + report.getUser()); 690 691 YarnApplicationState state = report.getYarnApplicationState(); 692 FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); 693 if (YarnApplicationState.FINISHED == state) { 694 if (FinalApplicationStatus.SUCCEEDED == dsStatus) { 695 LOG.info("Application has completed successfully. Breaking monitoring loop"); 696 return true; 697 } 698 else { 699 LOG.info("Application did finished unsuccessfully." 700 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() 701 + ". Breaking monitoring loop"); 702 return false; 703 } 704 } 705 else if (YarnApplicationState.KILLED == state 706 || YarnApplicationState.FAILED == state) { 707 LOG.info("Application did not finish." 708 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() 709 + ". Breaking monitoring loop"); 710 return false; 711 } 712 713 if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { 714 LOG.info("Reached client specified timeout for application. Killing application"); 715 forceKillApplication(appId); 716 return false; 717 } 718 } 719 720 } 721 722 /** 723 * Kill a submitted application by sending a call to the ASM 724 * @param appId Application Id to be killed. 725 * @throws YarnException 726 * @throws IOException 727 */ 728 private void forceKillApplication(ApplicationId appId) 729 throws YarnException, IOException { 730 // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 731 // the same time. 732 // If yes, can we kill a particular attempt only? 733 734 // Response can be ignored as it is non-null on success or 735 // throws an exception in case of failures 736 yarnClient.killApplication(appId); 737 } 738 739 private void addToLocalResources(FileSystem fs, String fileSrcPath, 740 String fileDstPath, String appId, Map<String, LocalResource> localResources, 741 String resources) throws IOException { 742 String suffix = 743 appName + "/" + appId + "/" + fileDstPath; 744 Path dst = 745 new Path(fs.getHomeDirectory(), suffix); 746 if (fileSrcPath == null) { 747 FSDataOutputStream ostream = null; 748 try { 749 ostream = FileSystem 750 .create(fs, dst, new FsPermission((short) 0710)); 751 ostream.writeUTF(resources); 752 } finally { 753 IOUtils.closeQuietly(ostream); 754 } 755 } else { 756 fs.copyFromLocalFile(new Path(fileSrcPath), dst); 757 } 758 FileStatus scFileStatus = fs.getFileStatus(dst); 759 LocalResource scRsrc = 760 LocalResource.newInstance( 761 ConverterUtils.getYarnUrlFromURI(dst.toUri()), 762 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 763 scFileStatus.getLen(), scFileStatus.getModificationTime()); 764 localResources.put(fileDstPath, scRsrc); 765 } 766 }