001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.applications.distributedshell; 020 021 import java.io.IOException; 022 import java.nio.ByteBuffer; 023 import java.util.ArrayList; 024 import java.util.HashMap; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.Vector; 028 029 import org.apache.commons.cli.CommandLine; 030 import org.apache.commons.cli.GnuParser; 031 import org.apache.commons.cli.HelpFormatter; 032 import org.apache.commons.cli.Option; 033 import org.apache.commons.cli.Options; 034 import org.apache.commons.cli.ParseException; 035 import org.apache.commons.io.IOUtils; 036 import org.apache.commons.lang.StringUtils; 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 import org.apache.hadoop.classification.InterfaceAudience; 040 import org.apache.hadoop.classification.InterfaceStability; 041 import org.apache.hadoop.conf.Configuration; 042 import org.apache.hadoop.fs.FSDataOutputStream; 043 import org.apache.hadoop.fs.FileStatus; 044 import org.apache.hadoop.fs.FileSystem; 045 import org.apache.hadoop.fs.Path; 046 import org.apache.hadoop.fs.permission.FsPermission; 047 import org.apache.hadoop.io.DataOutputBuffer; 048 import org.apache.hadoop.security.Credentials; 049 import org.apache.hadoop.security.UserGroupInformation; 050 import org.apache.hadoop.security.token.Token; 051 import org.apache.hadoop.yarn.api.ApplicationClientProtocol; 052 import org.apache.hadoop.yarn.api.ApplicationConstants; 053 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 054 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; 055 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; 056 import org.apache.hadoop.yarn.api.records.ApplicationId; 057 import org.apache.hadoop.yarn.api.records.ApplicationReport; 058 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; 059 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 060 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 061 import org.apache.hadoop.yarn.api.records.LocalResource; 062 import org.apache.hadoop.yarn.api.records.LocalResourceType; 063 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 064 import org.apache.hadoop.yarn.api.records.NodeReport; 065 import org.apache.hadoop.yarn.api.records.NodeState; 066 import org.apache.hadoop.yarn.api.records.Priority; 067 import org.apache.hadoop.yarn.api.records.QueueACL; 068 import org.apache.hadoop.yarn.api.records.QueueInfo; 069 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; 070 import org.apache.hadoop.yarn.api.records.Resource; 071 import org.apache.hadoop.yarn.api.records.YarnApplicationState; 072 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; 073 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; 074 import org.apache.hadoop.yarn.client.api.TimelineClient; 075 import org.apache.hadoop.yarn.client.api.YarnClient; 076 import org.apache.hadoop.yarn.client.api.YarnClientApplication; 077 import org.apache.hadoop.yarn.conf.YarnConfiguration; 078 import org.apache.hadoop.yarn.exceptions.YarnException; 079 import org.apache.hadoop.yarn.util.ConverterUtils; 080 import org.apache.hadoop.yarn.util.timeline.TimelineUtils; 081 082 /** 083 * Client for Distributed Shell application submission to YARN. 084 * 085 * <p> The distributed shell client allows an application master to be launched that in turn would run 086 * the provided shell command on a set of containers. </p> 087 * 088 * <p>This client is meant to act as an example on how to write yarn-based applications. </p> 089 * 090 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 091 * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} 092 * provides a way for the client to get access to cluster information and to request for a 093 * new {@link ApplicationId}. <p> 094 * 095 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 096 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 097 * and application name, the priority assigned to the application and the queue 098 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext} 099 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 100 * the {@link ApplicationMaster} is launched. </p> 101 * 102 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 103 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 104 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 105 * {@link ApplicationMaster}. <p> 106 * 107 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 108 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 109 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 110 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p> 111 * 112 */ 113 @InterfaceAudience.Public 114 @InterfaceStability.Unstable 115 public class Client { 116 117 private static final Log LOG = LogFactory.getLog(Client.class); 118 119 // Configuration 120 private Configuration conf; 121 private YarnClient yarnClient; 122 // Application master specific info to register a new Application with RM/ASM 123 private String appName = ""; 124 // App master priority 125 private int amPriority = 0; 126 // Queue for App master 127 private String amQueue = ""; 128 // Amt. of memory resource to request for to run the App Master 129 private int amMemory = 10; 130 // Amt. of virtual core resource to request for to run the App Master 131 private int amVCores = 1; 132 133 // Application master jar file 134 private String appMasterJar = ""; 135 // Main class to invoke application master 136 private final String appMasterMainClass; 137 138 // Shell command to be executed 139 private String shellCommand = ""; 140 // Location of shell script 141 private String shellScriptPath = ""; 142 // Args to be passed to the shell command 143 private String[] shellArgs = new String[] {}; 144 // Env variables to be setup for the shell command 145 private Map<String, String> shellEnv = new HashMap<String, String>(); 146 // Shell Command Container priority 147 private int shellCmdPriority = 0; 148 149 // Amt of memory to request for container in which shell script will be executed 150 private int containerMemory = 10; 151 // Amt. of virtual cores to request for container in which shell script will be executed 152 private int containerVirtualCores = 1; 153 // No. of containers in which the shell script needs to be executed 154 private int numContainers = 1; 155 private String nodeLabelExpression = null; 156 157 // log4j.properties file 158 // if available, add to local resources and set into classpath 159 private String log4jPropFile = ""; 160 161 // Start time for client 162 private final long clientStartTime = System.currentTimeMillis(); 163 // Timeout threshold for client. Kill app after time interval expires. 164 private long clientTimeout = 600000; 165 166 // flag to indicate whether to keep containers across application attempts. 167 private boolean keepContainers = false; 168 169 private long attemptFailuresValidityInterval = -1; 170 171 // Debug flag 172 boolean debugFlag = false; 173 174 // Timeline domain ID 175 private String domainId = null; 176 177 // Flag to indicate whether to create the domain of the given ID 178 private boolean toCreateDomain = false; 179 180 // Timeline domain reader access control 181 private String viewACLs = null; 182 183 // Timeline domain writer access control 184 private String modifyACLs = null; 185 186 // Command line options 187 private Options opts; 188 189 private static final String shellCommandPath = "shellCommands"; 190 private static final String shellArgsPath = "shellArgs"; 191 private static final String appMasterJarPath = "AppMaster.jar"; 192 // Hardcoded path to custom log_properties 193 private static final String log4jPath = "log4j.properties"; 194 195 public static final String SCRIPT_PATH = "ExecScript"; 196 197 /** 198 * @param args Command line arguments 199 */ 200 public static void main(String[] args) { 201 boolean result = false; 202 try { 203 Client client = new Client(); 204 LOG.info("Initializing Client"); 205 try { 206 boolean doRun = client.init(args); 207 if (!doRun) { 208 System.exit(0); 209 } 210 } catch (IllegalArgumentException e) { 211 System.err.println(e.getLocalizedMessage()); 212 client.printUsage(); 213 System.exit(-1); 214 } 215 result = client.run(); 216 } catch (Throwable t) { 217 LOG.fatal("Error running Client", t); 218 System.exit(1); 219 } 220 if (result) { 221 LOG.info("Application completed successfully"); 222 System.exit(0); 223 } 224 LOG.error("Application failed to complete successfully"); 225 System.exit(2); 226 } 227 228 /** 229 */ 230 public Client(Configuration conf) throws Exception { 231 this( 232 "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster", 233 conf); 234 } 235 236 Client(String appMasterMainClass, Configuration conf) { 237 this.conf = conf; 238 this.appMasterMainClass = appMasterMainClass; 239 yarnClient = YarnClient.createYarnClient(); 240 yarnClient.init(conf); 241 opts = new Options(); 242 opts.addOption("appname", true, "Application Name. Default value - DistributedShell"); 243 opts.addOption("priority", true, "Application Priority. Default 0"); 244 opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); 245 opts.addOption("timeout", true, "Application timeout in milliseconds"); 246 opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); 247 opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master"); 248 opts.addOption("jar", true, "Jar file containing the application master"); 249 opts.addOption("shell_command", true, "Shell command to be executed by " + 250 "the Application Master. Can only specify either --shell_command " + 251 "or --shell_script"); 252 opts.addOption("shell_script", true, "Location of the shell script to be " + 253 "executed. Can only specify either --shell_command or --shell_script"); 254 opts.addOption("shell_args", true, "Command line args for the shell script." + 255 "Multiple args can be separated by empty space."); 256 opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES); 257 opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); 258 opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); 259 opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); 260 opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); 261 opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); 262 opts.addOption("log_properties", true, "log4j.properties file"); 263 opts.addOption("keep_containers_across_application_attempts", false, 264 "Flag to indicate whether to keep containers across application attempts." + 265 " If the flag is true, running containers will not be killed when" + 266 " application attempt fails and these containers will be retrieved by" + 267 " the new application attempt "); 268 opts.addOption("attempt_failures_validity_interval", true, 269 "when attempt_failures_validity_interval in milliseconds is set to > 0," + 270 "the failure number will not take failures which happen out of " + 271 "the validityInterval into failure count. " + 272 "If failure count reaches to maxAppAttempts, " + 273 "the application will be failed."); 274 opts.addOption("debug", false, "Dump out debug information"); 275 opts.addOption("domain", true, "ID of the timeline domain where the " 276 + "timeline entities will be put"); 277 opts.addOption("view_acls", true, "Users and groups that allowed to " 278 + "view the timeline entities in the given domain"); 279 opts.addOption("modify_acls", true, "Users and groups that allowed to " 280 + "modify the timeline entities in the given domain"); 281 opts.addOption("create", false, "Flag to indicate whether to create the " 282 + "domain specified with -domain."); 283 opts.addOption("help", false, "Print usage"); 284 opts.addOption("node_label_expression", true, 285 "Node label expression to determine the nodes" 286 + " where all the containers of this application" 287 + " will be allocated, \"\" means containers" 288 + " can be allocated anywhere, if you don't specify the option," 289 + " default node_label_expression of queue will be used."); 290 } 291 292 /** 293 */ 294 public Client() throws Exception { 295 this(new YarnConfiguration()); 296 } 297 298 /** 299 * Helper function to print out usage 300 */ 301 private void printUsage() { 302 new HelpFormatter().printHelp("Client", opts); 303 } 304 305 /** 306 * Parse command line options 307 * @param args Parsed command line options 308 * @return Whether the init was successful to run the client 309 * @throws ParseException 310 */ 311 public boolean init(String[] args) throws ParseException { 312 313 CommandLine cliParser = new GnuParser().parse(opts, args); 314 315 if (args.length == 0) { 316 throw new IllegalArgumentException("No args specified for client to initialize"); 317 } 318 319 if (cliParser.hasOption("log_properties")) { 320 String log4jPath = cliParser.getOptionValue("log_properties"); 321 try { 322 Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath); 323 } catch (Exception e) { 324 LOG.warn("Can not set up custom log4j properties. " + e); 325 } 326 } 327 328 if (cliParser.hasOption("help")) { 329 printUsage(); 330 return false; 331 } 332 333 if (cliParser.hasOption("debug")) { 334 debugFlag = true; 335 336 } 337 338 if (cliParser.hasOption("keep_containers_across_application_attempts")) { 339 LOG.info("keep_containers_across_application_attempts"); 340 keepContainers = true; 341 } 342 343 appName = cliParser.getOptionValue("appname", "DistributedShell"); 344 amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); 345 amQueue = cliParser.getOptionValue("queue", "default"); 346 amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10")); 347 amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1")); 348 349 if (amMemory < 0) { 350 throw new IllegalArgumentException("Invalid memory specified for application master, exiting." 351 + " Specified memory=" + amMemory); 352 } 353 if (amVCores < 0) { 354 throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting." 355 + " Specified virtual cores=" + amVCores); 356 } 357 358 if (!cliParser.hasOption("jar")) { 359 throw new IllegalArgumentException("No jar file specified for application master"); 360 } 361 362 appMasterJar = cliParser.getOptionValue("jar"); 363 364 if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) { 365 throw new IllegalArgumentException( 366 "No shell command or shell script specified to be executed by application master"); 367 } else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) { 368 throw new IllegalArgumentException("Can not specify shell_command option " + 369 "and shell_script option at the same time"); 370 } else if (cliParser.hasOption("shell_command")) { 371 shellCommand = cliParser.getOptionValue("shell_command"); 372 } else { 373 shellScriptPath = cliParser.getOptionValue("shell_script"); 374 } 375 if (cliParser.hasOption("shell_args")) { 376 shellArgs = cliParser.getOptionValues("shell_args"); 377 } 378 if (cliParser.hasOption("shell_env")) { 379 String envs[] = cliParser.getOptionValues("shell_env"); 380 for (String env : envs) { 381 env = env.trim(); 382 int index = env.indexOf('='); 383 if (index == -1) { 384 shellEnv.put(env, ""); 385 continue; 386 } 387 String key = env.substring(0, index); 388 String val = ""; 389 if (index < (env.length()-1)) { 390 val = env.substring(index+1); 391 } 392 shellEnv.put(key, val); 393 } 394 } 395 shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0")); 396 397 containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); 398 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); 399 numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); 400 401 402 if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { 403 throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," 404 + " exiting." 405 + " Specified containerMemory=" + containerMemory 406 + ", containerVirtualCores=" + containerVirtualCores 407 + ", numContainer=" + numContainers); 408 } 409 410 nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null); 411 412 clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); 413 414 attemptFailuresValidityInterval = 415 Long.parseLong(cliParser.getOptionValue( 416 "attempt_failures_validity_interval", "-1")); 417 418 log4jPropFile = cliParser.getOptionValue("log_properties", ""); 419 420 // Get timeline domain options 421 if (cliParser.hasOption("domain")) { 422 domainId = cliParser.getOptionValue("domain"); 423 toCreateDomain = cliParser.hasOption("create"); 424 if (cliParser.hasOption("view_acls")) { 425 viewACLs = cliParser.getOptionValue("view_acls"); 426 } 427 if (cliParser.hasOption("modify_acls")) { 428 modifyACLs = cliParser.getOptionValue("modify_acls"); 429 } 430 } 431 432 return true; 433 } 434 435 /** 436 * Main run function for the client 437 * @return true if application completed successfully 438 * @throws IOException 439 * @throws YarnException 440 */ 441 public boolean run() throws IOException, YarnException { 442 443 LOG.info("Running Client"); 444 yarnClient.start(); 445 446 YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); 447 LOG.info("Got Cluster metric info from ASM" 448 + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); 449 450 List<NodeReport> clusterNodeReports = yarnClient.getNodeReports( 451 NodeState.RUNNING); 452 LOG.info("Got Cluster node info from ASM"); 453 for (NodeReport node : clusterNodeReports) { 454 LOG.info("Got node report from ASM for" 455 + ", nodeId=" + node.getNodeId() 456 + ", nodeAddress" + node.getHttpAddress() 457 + ", nodeRackName" + node.getRackName() 458 + ", nodeNumContainers" + node.getNumContainers()); 459 } 460 461 QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue); 462 LOG.info("Queue info" 463 + ", queueName=" + queueInfo.getQueueName() 464 + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() 465 + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity() 466 + ", queueApplicationCount=" + queueInfo.getApplications().size() 467 + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); 468 469 List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo(); 470 for (QueueUserACLInfo aclInfo : listAclInfo) { 471 for (QueueACL userAcl : aclInfo.getUserAcls()) { 472 LOG.info("User ACL Info for Queue" 473 + ", queueName=" + aclInfo.getQueueName() 474 + ", userAcl=" + userAcl.name()); 475 } 476 } 477 478 if (domainId != null && domainId.length() > 0 && toCreateDomain) { 479 prepareTimelineDomain(); 480 } 481 482 // Get a new application id 483 YarnClientApplication app = yarnClient.createApplication(); 484 GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); 485 // TODO get min/max resource capabilities from RM and change memory ask if needed 486 // If we do not have min/max, we may not be able to correctly request 487 // the required resources from the RM for the app master 488 // Memory ask has to be a multiple of min and less than max. 489 // Dump out information about cluster capability as seen by the resource manager 490 int maxMem = appResponse.getMaximumResourceCapability().getMemory(); 491 LOG.info("Max mem capabililty of resources in this cluster " + maxMem); 492 493 // A resource ask cannot exceed the max. 494 if (amMemory > maxMem) { 495 LOG.info("AM memory specified above max threshold of cluster. Using max value." 496 + ", specified=" + amMemory 497 + ", max=" + maxMem); 498 amMemory = maxMem; 499 } 500 501 int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores(); 502 LOG.info("Max virtual cores capabililty of resources in this cluster " + maxVCores); 503 504 if (amVCores > maxVCores) { 505 LOG.info("AM virtual cores specified above max threshold of cluster. " 506 + "Using max value." + ", specified=" + amVCores 507 + ", max=" + maxVCores); 508 amVCores = maxVCores; 509 } 510 511 // set the application name 512 ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); 513 ApplicationId appId = appContext.getApplicationId(); 514 515 appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); 516 appContext.setApplicationName(appName); 517 518 if (attemptFailuresValidityInterval >= 0) { 519 appContext 520 .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); 521 } 522 523 // set local resources for the application master 524 // local files or archives as needed 525 // In this scenario, the jar file for the application master is part of the local resources 526 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 527 528 LOG.info("Copy App Master jar from local filesystem and add to local environment"); 529 // Copy the application master jar to the filesystem 530 // Create a local resource to point to the destination jar path 531 FileSystem fs = FileSystem.get(conf); 532 addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), 533 localResources, null); 534 535 // Set the log4j properties if needed 536 if (!log4jPropFile.isEmpty()) { 537 addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(), 538 localResources, null); 539 } 540 541 // The shell script has to be made available on the final container(s) 542 // where it will be executed. 543 // To do this, we need to first copy into the filesystem that is visible 544 // to the yarn framework. 545 // We do not need to set this as a local resource for the application 546 // master as the application master does not need it. 547 String hdfsShellScriptLocation = ""; 548 long hdfsShellScriptLen = 0; 549 long hdfsShellScriptTimestamp = 0; 550 if (!shellScriptPath.isEmpty()) { 551 Path shellSrc = new Path(shellScriptPath); 552 String shellPathSuffix = 553 appName + "/" + appId.toString() + "/" + SCRIPT_PATH; 554 Path shellDst = 555 new Path(fs.getHomeDirectory(), shellPathSuffix); 556 fs.copyFromLocalFile(false, true, shellSrc, shellDst); 557 hdfsShellScriptLocation = shellDst.toUri().toString(); 558 FileStatus shellFileStatus = fs.getFileStatus(shellDst); 559 hdfsShellScriptLen = shellFileStatus.getLen(); 560 hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); 561 } 562 563 if (!shellCommand.isEmpty()) { 564 addToLocalResources(fs, null, shellCommandPath, appId.toString(), 565 localResources, shellCommand); 566 } 567 568 if (shellArgs.length > 0) { 569 addToLocalResources(fs, null, shellArgsPath, appId.toString(), 570 localResources, StringUtils.join(shellArgs, " ")); 571 } 572 573 // Set the necessary security tokens as needed 574 //amContainer.setContainerTokens(containerToken); 575 576 // Set the env variables to be setup in the env where the application master will be run 577 LOG.info("Set the environment for the application master"); 578 Map<String, String> env = new HashMap<String, String>(); 579 580 // put location of shell script into env 581 // using the env info, the application master will create the correct local resource for the 582 // eventual containers that will be launched to execute the shell scripts 583 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); 584 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); 585 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); 586 if (domainId != null && domainId.length() > 0) { 587 env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId); 588 } 589 590 // Add AppMaster.jar location to classpath 591 // At some point we should not be required to add 592 // the hadoop specific classpaths to the env. 593 // It should be provided out of the box. 594 // For now setting all required classpaths including 595 // the classpath to "." for the application jar 596 StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$()) 597 .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*"); 598 for (String c : conf.getStrings( 599 YarnConfiguration.YARN_APPLICATION_CLASSPATH, 600 YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) { 601 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR); 602 classPathEnv.append(c.trim()); 603 } 604 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append( 605 "./log4j.properties"); 606 607 // add the runtime classpath needed for tests to work 608 if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { 609 classPathEnv.append(':'); 610 classPathEnv.append(System.getProperty("java.class.path")); 611 } 612 613 env.put("CLASSPATH", classPathEnv.toString()); 614 615 // Set the necessary command to execute the application master 616 Vector<CharSequence> vargs = new Vector<CharSequence>(30); 617 618 // Set java executable command 619 LOG.info("Setting up app master command"); 620 vargs.add(Environment.JAVA_HOME.$$() + "/bin/java"); 621 // Set Xmx based on am memory size 622 vargs.add("-Xmx" + amMemory + "m"); 623 // Set class name 624 vargs.add(appMasterMainClass); 625 // Set params for Application Master 626 vargs.add("--container_memory " + String.valueOf(containerMemory)); 627 vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); 628 vargs.add("--num_containers " + String.valueOf(numContainers)); 629 if (null != nodeLabelExpression) { 630 appContext.setNodeLabelExpression(nodeLabelExpression); 631 } 632 vargs.add("--priority " + String.valueOf(shellCmdPriority)); 633 634 for (Map.Entry<String, String> entry : shellEnv.entrySet()) { 635 vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); 636 } 637 if (debugFlag) { 638 vargs.add("--debug"); 639 } 640 641 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); 642 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); 643 644 // Get final commmand 645 StringBuilder command = new StringBuilder(); 646 for (CharSequence str : vargs) { 647 command.append(str).append(" "); 648 } 649 650 LOG.info("Completed setting up app master command " + command.toString()); 651 List<String> commands = new ArrayList<String>(); 652 commands.add(command.toString()); 653 654 // Set up the container launch context for the application master 655 ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( 656 localResources, env, commands, null, null, null); 657 658 // Set up resource type requirements 659 // For now, both memory and vcores are supported, so we set memory and 660 // vcores requirements 661 Resource capability = Resource.newInstance(amMemory, amVCores); 662 appContext.setResource(capability); 663 664 // Service data is a binary blob that can be passed to the application 665 // Not needed in this scenario 666 // amContainer.setServiceData(serviceData); 667 668 // Setup security tokens 669 if (UserGroupInformation.isSecurityEnabled()) { 670 // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce 671 Credentials credentials = new Credentials(); 672 String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); 673 if (tokenRenewer == null || tokenRenewer.length() == 0) { 674 throw new IOException( 675 "Can't get Master Kerberos principal for the RM to use as renewer"); 676 } 677 678 // For now, only getting tokens for the default file-system. 679 final Token<?> tokens[] = 680 fs.addDelegationTokens(tokenRenewer, credentials); 681 if (tokens != null) { 682 for (Token<?> token : tokens) { 683 LOG.info("Got dt for " + fs.getUri() + "; " + token); 684 } 685 } 686 DataOutputBuffer dob = new DataOutputBuffer(); 687 credentials.writeTokenStorageToStream(dob); 688 ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 689 amContainer.setTokens(fsTokens); 690 } 691 692 appContext.setAMContainerSpec(amContainer); 693 694 // Set the priority for the application master 695 // TODO - what is the range for priority? how to decide? 696 Priority pri = Priority.newInstance(amPriority); 697 appContext.setPriority(pri); 698 699 // Set the queue to which this application is to be submitted in the RM 700 appContext.setQueue(amQueue); 701 702 // Submit the application to the applications manager 703 // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); 704 // Ignore the response as either a valid response object is returned on success 705 // or an exception thrown to denote some form of a failure 706 LOG.info("Submitting application to ASM"); 707 708 yarnClient.submitApplication(appContext); 709 710 // TODO 711 // Try submitting the same request again 712 // app submission failure? 713 714 // Monitor the application 715 return monitorApplication(appId); 716 717 } 718 719 /** 720 * Monitor the submitted application for completion. 721 * Kill application if time expires. 722 * @param appId Application Id of application to be monitored 723 * @return true if application completed successfully 724 * @throws YarnException 725 * @throws IOException 726 */ 727 private boolean monitorApplication(ApplicationId appId) 728 throws YarnException, IOException { 729 730 while (true) { 731 732 // Check app status every 1 second. 733 try { 734 Thread.sleep(1000); 735 } catch (InterruptedException e) { 736 LOG.debug("Thread sleep in monitoring loop interrupted"); 737 } 738 739 // Get application report for the appId we are interested in 740 ApplicationReport report = yarnClient.getApplicationReport(appId); 741 742 LOG.info("Got application report from ASM for" 743 + ", appId=" + appId.getId() 744 + ", clientToAMToken=" + report.getClientToAMToken() 745 + ", appDiagnostics=" + report.getDiagnostics() 746 + ", appMasterHost=" + report.getHost() 747 + ", appQueue=" + report.getQueue() 748 + ", appMasterRpcPort=" + report.getRpcPort() 749 + ", appStartTime=" + report.getStartTime() 750 + ", yarnAppState=" + report.getYarnApplicationState().toString() 751 + ", distributedFinalState=" + report.getFinalApplicationStatus().toString() 752 + ", appTrackingUrl=" + report.getTrackingUrl() 753 + ", appUser=" + report.getUser()); 754 755 YarnApplicationState state = report.getYarnApplicationState(); 756 FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); 757 if (YarnApplicationState.FINISHED == state) { 758 if (FinalApplicationStatus.SUCCEEDED == dsStatus) { 759 LOG.info("Application has completed successfully. Breaking monitoring loop"); 760 return true; 761 } 762 else { 763 LOG.info("Application did finished unsuccessfully." 764 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() 765 + ". Breaking monitoring loop"); 766 return false; 767 } 768 } 769 else if (YarnApplicationState.KILLED == state 770 || YarnApplicationState.FAILED == state) { 771 LOG.info("Application did not finish." 772 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() 773 + ". Breaking monitoring loop"); 774 return false; 775 } 776 777 if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { 778 LOG.info("Reached client specified timeout for application. Killing application"); 779 forceKillApplication(appId); 780 return false; 781 } 782 } 783 784 } 785 786 /** 787 * Kill a submitted application by sending a call to the ASM 788 * @param appId Application Id to be killed. 789 * @throws YarnException 790 * @throws IOException 791 */ 792 private void forceKillApplication(ApplicationId appId) 793 throws YarnException, IOException { 794 // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 795 // the same time. 796 // If yes, can we kill a particular attempt only? 797 798 // Response can be ignored as it is non-null on success or 799 // throws an exception in case of failures 800 yarnClient.killApplication(appId); 801 } 802 803 private void addToLocalResources(FileSystem fs, String fileSrcPath, 804 String fileDstPath, String appId, Map<String, LocalResource> localResources, 805 String resources) throws IOException { 806 String suffix = 807 appName + "/" + appId + "/" + fileDstPath; 808 Path dst = 809 new Path(fs.getHomeDirectory(), suffix); 810 if (fileSrcPath == null) { 811 FSDataOutputStream ostream = null; 812 try { 813 ostream = FileSystem 814 .create(fs, dst, new FsPermission((short) 0710)); 815 ostream.writeUTF(resources); 816 } finally { 817 IOUtils.closeQuietly(ostream); 818 } 819 } else { 820 fs.copyFromLocalFile(new Path(fileSrcPath), dst); 821 } 822 FileStatus scFileStatus = fs.getFileStatus(dst); 823 LocalResource scRsrc = 824 LocalResource.newInstance( 825 ConverterUtils.getYarnUrlFromURI(dst.toUri()), 826 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 827 scFileStatus.getLen(), scFileStatus.getModificationTime()); 828 localResources.put(fileDstPath, scRsrc); 829 } 830 831 private void prepareTimelineDomain() { 832 TimelineClient timelineClient = null; 833 if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, 834 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { 835 timelineClient = TimelineClient.createTimelineClient(); 836 timelineClient.init(conf); 837 timelineClient.start(); 838 } else { 839 LOG.warn("Cannot put the domain " + domainId + 840 " because the timeline service is not enabled"); 841 return; 842 } 843 try { 844 //TODO: we need to check and combine the existing timeline domain ACLs, 845 //but let's do it once we have client java library to query domains. 846 TimelineDomain domain = new TimelineDomain(); 847 domain.setId(domainId); 848 domain.setReaders( 849 viewACLs != null && viewACLs.length() > 0 ? viewACLs : " "); 850 domain.setWriters( 851 modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " "); 852 timelineClient.putDomain(domain); 853 LOG.info("Put the timeline domain: " + 854 TimelineUtils.dumpTimelineRecordtoJSON(domain)); 855 } catch (Exception e) { 856 LOG.error("Error when putting the timeline domain", e); 857 } finally { 858 timelineClient.stop(); 859 } 860 } 861 }