001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.applications.distributedshell; 020 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Vector; 028 029import org.apache.commons.cli.CommandLine; 030import org.apache.commons.cli.GnuParser; 031import org.apache.commons.cli.HelpFormatter; 032import org.apache.commons.cli.Option; 033import org.apache.commons.cli.Options; 034import org.apache.commons.cli.ParseException; 035import org.apache.commons.io.IOUtils; 036import org.apache.commons.lang.StringUtils; 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039import org.apache.hadoop.classification.InterfaceAudience; 040import org.apache.hadoop.classification.InterfaceStability; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FSDataOutputStream; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.fs.permission.FsPermission; 047import org.apache.hadoop.io.DataOutputBuffer; 048import org.apache.hadoop.security.Credentials; 049import org.apache.hadoop.security.UserGroupInformation; 050import org.apache.hadoop.security.token.Token; 051import org.apache.hadoop.yarn.api.ApplicationClientProtocol; 052import org.apache.hadoop.yarn.api.ApplicationConstants; 053import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 054import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; 055import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; 056import org.apache.hadoop.yarn.api.records.ApplicationId; 057import org.apache.hadoop.yarn.api.records.ApplicationReport; 058import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; 059import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 060import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 061import org.apache.hadoop.yarn.api.records.LocalResource; 062import org.apache.hadoop.yarn.api.records.LocalResourceType; 063import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 064import org.apache.hadoop.yarn.api.records.NodeReport; 065import org.apache.hadoop.yarn.api.records.NodeState; 066import org.apache.hadoop.yarn.api.records.Priority; 067import org.apache.hadoop.yarn.api.records.QueueACL; 068import org.apache.hadoop.yarn.api.records.QueueInfo; 069import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; 070import org.apache.hadoop.yarn.api.records.Resource; 071import org.apache.hadoop.yarn.api.records.URL; 072import org.apache.hadoop.yarn.api.records.YarnApplicationState; 073import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; 074import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; 075import org.apache.hadoop.yarn.client.api.TimelineClient; 076import org.apache.hadoop.yarn.client.api.YarnClient; 077import org.apache.hadoop.yarn.client.api.YarnClientApplication; 078import org.apache.hadoop.yarn.conf.YarnConfiguration; 079import org.apache.hadoop.yarn.exceptions.YarnException; 080import org.apache.hadoop.yarn.util.ConverterUtils; 081import org.apache.hadoop.yarn.util.timeline.TimelineUtils; 082 083/** 084 * Client for Distributed Shell application submission to YARN. 085 * 086 * <p> The distributed shell client allows an application master to be launched that in turn would run 087 * the provided shell command on a set of containers. </p> 088 * 089 * <p>This client is meant to act as an example on how to write yarn-based applications. </p> 090 * 091 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 092 * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} 093 * provides a way for the client to get access to cluster information and to request for a 094 * new {@link ApplicationId}. <p> 095 * 096 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 097 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 098 * and application name, the priority assigned to the application and the queue 099 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext} 100 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 101 * the {@link ApplicationMaster} is launched. </p> 102 * 103 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 104 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 105 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 106 * {@link ApplicationMaster}. <p> 107 * 108 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 109 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 110 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 111 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p> 112 * 113 */ 114@InterfaceAudience.Public 115@InterfaceStability.Unstable 116public class Client { 117 118 private static final Log LOG = LogFactory.getLog(Client.class); 119 120 // Configuration 121 private Configuration conf; 122 private YarnClient yarnClient; 123 // Application master specific info to register a new Application with RM/ASM 124 private String appName = ""; 125 // App master priority 126 private int amPriority = 0; 127 // Queue for App master 128 private String amQueue = ""; 129 // Amt. of memory resource to request for to run the App Master 130 private long amMemory = 100; 131 // Amt. of virtual core resource to request for to run the App Master 132 private int amVCores = 1; 133 134 // Application master jar file 135 private String appMasterJar = ""; 136 // Main class to invoke application master 137 private final String appMasterMainClass; 138 139 // Shell command to be executed 140 private String shellCommand = ""; 141 // Location of shell script 142 private String shellScriptPath = ""; 143 // Args to be passed to the shell command 144 private String[] shellArgs = new String[] {}; 145 // Env variables to be setup for the shell command 146 private Map<String, String> shellEnv = new HashMap<String, String>(); 147 // Shell Command Container priority 148 private int shellCmdPriority = 0; 149 150 // Amt of memory to request for container in which shell script will be executed 151 private int containerMemory = 10; 152 // Amt. of virtual cores to request for container in which shell script will be executed 153 private int containerVirtualCores = 1; 154 // No. of containers in which the shell script needs to be executed 155 private int numContainers = 1; 156 private String nodeLabelExpression = null; 157 158 // log4j.properties file 159 // if available, add to local resources and set into classpath 160 private String log4jPropFile = ""; 161 162 // Start time for client 163 private final long clientStartTime = System.currentTimeMillis(); 164 // Timeout threshold for client. Kill app after time interval expires. 165 private long clientTimeout = 600000; 166 167 // flag to indicate whether to keep containers across application attempts. 168 private boolean keepContainers = false; 169 170 private long attemptFailuresValidityInterval = -1; 171 172 // Debug flag 173 boolean debugFlag = false; 174 175 // Timeline domain ID 176 private String domainId = null; 177 178 // Flag to indicate whether to create the domain of the given ID 179 private boolean toCreateDomain = false; 180 181 // Timeline domain reader access control 182 private String viewACLs = null; 183 184 // Timeline domain writer access control 185 private String modifyACLs = null; 186 187 // Command line options 188 private Options opts; 189 190 private static final String shellCommandPath = "shellCommands"; 191 private static final String shellArgsPath = "shellArgs"; 192 private static final String appMasterJarPath = "AppMaster.jar"; 193 // Hardcoded path to custom log_properties 194 private static final String log4jPath = "log4j.properties"; 195 196 public static final String SCRIPT_PATH = "ExecScript"; 197 198 /** 199 * @param args Command line arguments 200 */ 201 public static void main(String[] args) { 202 boolean result = false; 203 try { 204 Client client = new Client(); 205 LOG.info("Initializing Client"); 206 try { 207 boolean doRun = client.init(args); 208 if (!doRun) { 209 System.exit(0); 210 } 211 } catch (IllegalArgumentException e) { 212 System.err.println(e.getLocalizedMessage()); 213 client.printUsage(); 214 System.exit(-1); 215 } 216 result = client.run(); 217 } catch (Throwable t) { 218 LOG.fatal("Error running Client", t); 219 System.exit(1); 220 } 221 if (result) { 222 LOG.info("Application completed successfully"); 223 System.exit(0); 224 } 225 LOG.error("Application failed to complete successfully"); 226 System.exit(2); 227 } 228 229 /** 230 */ 231 public Client(Configuration conf) throws Exception { 232 this( 233 "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster", 234 conf); 235 } 236 237 Client(String appMasterMainClass, Configuration conf) { 238 this.conf = conf; 239 this.appMasterMainClass = appMasterMainClass; 240 yarnClient = YarnClient.createYarnClient(); 241 yarnClient.init(conf); 242 opts = new Options(); 243 opts.addOption("appname", true, "Application Name. Default value - DistributedShell"); 244 opts.addOption("priority", true, "Application Priority. Default 0"); 245 opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); 246 opts.addOption("timeout", true, "Application timeout in milliseconds"); 247 opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); 248 opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master"); 249 opts.addOption("jar", true, "Jar file containing the application master"); 250 opts.addOption("shell_command", true, "Shell command to be executed by " + 251 "the Application Master. Can only specify either --shell_command " + 252 "or --shell_script"); 253 opts.addOption("shell_script", true, "Location of the shell script to be " + 254 "executed. Can only specify either --shell_command or --shell_script"); 255 opts.addOption("shell_args", true, "Command line args for the shell script." + 256 "Multiple args can be separated by empty space."); 257 opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES); 258 opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); 259 opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); 260 opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); 261 opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); 262 opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); 263 opts.addOption("log_properties", true, "log4j.properties file"); 264 opts.addOption("keep_containers_across_application_attempts", false, 265 "Flag to indicate whether to keep containers across application attempts." + 266 " If the flag is true, running containers will not be killed when" + 267 " application attempt fails and these containers will be retrieved by" + 268 " the new application attempt "); 269 opts.addOption("attempt_failures_validity_interval", true, 270 "when attempt_failures_validity_interval in milliseconds is set to > 0," + 271 "the failure number will not take failures which happen out of " + 272 "the validityInterval into failure count. " + 273 "If failure count reaches to maxAppAttempts, " + 274 "the application will be failed."); 275 opts.addOption("debug", false, "Dump out debug information"); 276 opts.addOption("domain", true, "ID of the timeline domain where the " 277 + "timeline entities will be put"); 278 opts.addOption("view_acls", true, "Users and groups that allowed to " 279 + "view the timeline entities in the given domain"); 280 opts.addOption("modify_acls", true, "Users and groups that allowed to " 281 + "modify the timeline entities in the given domain"); 282 opts.addOption("create", false, "Flag to indicate whether to create the " 283 + "domain specified with -domain."); 284 opts.addOption("help", false, "Print usage"); 285 opts.addOption("node_label_expression", true, 286 "Node label expression to determine the nodes" 287 + " where all the containers of this application" 288 + " will be allocated, \"\" means containers" 289 + " can be allocated anywhere, if you don't specify the option," 290 + " default node_label_expression of queue will be used."); 291 } 292 293 /** 294 */ 295 public Client() throws Exception { 296 this(new YarnConfiguration()); 297 } 298 299 /** 300 * Helper function to print out usage 301 */ 302 private void printUsage() { 303 new HelpFormatter().printHelp("Client", opts); 304 } 305 306 /** 307 * Parse command line options 308 * @param args Parsed command line options 309 * @return Whether the init was successful to run the client 310 * @throws ParseException 311 */ 312 public boolean init(String[] args) throws ParseException { 313 314 CommandLine cliParser = new GnuParser().parse(opts, args); 315 316 if (args.length == 0) { 317 throw new IllegalArgumentException("No args specified for client to initialize"); 318 } 319 320 if (cliParser.hasOption("log_properties")) { 321 String log4jPath = cliParser.getOptionValue("log_properties"); 322 try { 323 Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath); 324 } catch (Exception e) { 325 LOG.warn("Can not set up custom log4j properties. " + e); 326 } 327 } 328 329 if (cliParser.hasOption("help")) { 330 printUsage(); 331 return false; 332 } 333 334 if (cliParser.hasOption("debug")) { 335 debugFlag = true; 336 337 } 338 339 if (cliParser.hasOption("keep_containers_across_application_attempts")) { 340 LOG.info("keep_containers_across_application_attempts"); 341 keepContainers = true; 342 } 343 344 appName = cliParser.getOptionValue("appname", "DistributedShell"); 345 amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); 346 amQueue = cliParser.getOptionValue("queue", "default"); 347 amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "100")); 348 amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1")); 349 350 if (amMemory < 0) { 351 throw new IllegalArgumentException("Invalid memory specified for application master, exiting." 352 + " Specified memory=" + amMemory); 353 } 354 if (amVCores < 0) { 355 throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting." 356 + " Specified virtual cores=" + amVCores); 357 } 358 359 if (!cliParser.hasOption("jar")) { 360 throw new IllegalArgumentException("No jar file specified for application master"); 361 } 362 363 appMasterJar = cliParser.getOptionValue("jar"); 364 365 if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) { 366 throw new IllegalArgumentException( 367 "No shell command or shell script specified to be executed by application master"); 368 } else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) { 369 throw new IllegalArgumentException("Can not specify shell_command option " + 370 "and shell_script option at the same time"); 371 } else if (cliParser.hasOption("shell_command")) { 372 shellCommand = cliParser.getOptionValue("shell_command"); 373 } else { 374 shellScriptPath = cliParser.getOptionValue("shell_script"); 375 } 376 if (cliParser.hasOption("shell_args")) { 377 shellArgs = cliParser.getOptionValues("shell_args"); 378 } 379 if (cliParser.hasOption("shell_env")) { 380 String envs[] = cliParser.getOptionValues("shell_env"); 381 for (String env : envs) { 382 env = env.trim(); 383 int index = env.indexOf('='); 384 if (index == -1) { 385 shellEnv.put(env, ""); 386 continue; 387 } 388 String key = env.substring(0, index); 389 String val = ""; 390 if (index < (env.length()-1)) { 391 val = env.substring(index+1); 392 } 393 shellEnv.put(key, val); 394 } 395 } 396 shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0")); 397 398 containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); 399 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); 400 numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); 401 402 403 if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { 404 throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," 405 + " exiting." 406 + " Specified containerMemory=" + containerMemory 407 + ", containerVirtualCores=" + containerVirtualCores 408 + ", numContainer=" + numContainers); 409 } 410 411 nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null); 412 413 clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); 414 415 attemptFailuresValidityInterval = 416 Long.parseLong(cliParser.getOptionValue( 417 "attempt_failures_validity_interval", "-1")); 418 419 log4jPropFile = cliParser.getOptionValue("log_properties", ""); 420 421 // Get timeline domain options 422 if (cliParser.hasOption("domain")) { 423 domainId = cliParser.getOptionValue("domain"); 424 toCreateDomain = cliParser.hasOption("create"); 425 if (cliParser.hasOption("view_acls")) { 426 viewACLs = cliParser.getOptionValue("view_acls"); 427 } 428 if (cliParser.hasOption("modify_acls")) { 429 modifyACLs = cliParser.getOptionValue("modify_acls"); 430 } 431 } 432 433 return true; 434 } 435 436 /** 437 * Main run function for the client 438 * @return true if application completed successfully 439 * @throws IOException 440 * @throws YarnException 441 */ 442 public boolean run() throws IOException, YarnException { 443 444 LOG.info("Running Client"); 445 yarnClient.start(); 446 447 YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); 448 LOG.info("Got Cluster metric info from ASM" 449 + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); 450 451 List<NodeReport> clusterNodeReports = yarnClient.getNodeReports( 452 NodeState.RUNNING); 453 LOG.info("Got Cluster node info from ASM"); 454 for (NodeReport node : clusterNodeReports) { 455 LOG.info("Got node report from ASM for" 456 + ", nodeId=" + node.getNodeId() 457 + ", nodeAddress=" + node.getHttpAddress() 458 + ", nodeRackName=" + node.getRackName() 459 + ", nodeNumContainers=" + node.getNumContainers()); 460 } 461 462 QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue); 463 LOG.info("Queue info" 464 + ", queueName=" + queueInfo.getQueueName() 465 + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() 466 + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity() 467 + ", queueApplicationCount=" + queueInfo.getApplications().size() 468 + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); 469 470 List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo(); 471 for (QueueUserACLInfo aclInfo : listAclInfo) { 472 for (QueueACL userAcl : aclInfo.getUserAcls()) { 473 LOG.info("User ACL Info for Queue" 474 + ", queueName=" + aclInfo.getQueueName() 475 + ", userAcl=" + userAcl.name()); 476 } 477 } 478 479 if (domainId != null && domainId.length() > 0 && toCreateDomain) { 480 prepareTimelineDomain(); 481 } 482 483 // Get a new application id 484 YarnClientApplication app = yarnClient.createApplication(); 485 GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); 486 // TODO get min/max resource capabilities from RM and change memory ask if needed 487 // If we do not have min/max, we may not be able to correctly request 488 // the required resources from the RM for the app master 489 // Memory ask has to be a multiple of min and less than max. 490 // Dump out information about cluster capability as seen by the resource manager 491 long maxMem = appResponse.getMaximumResourceCapability().getMemorySize(); 492 LOG.info("Max mem capability of resources in this cluster " + maxMem); 493 494 // A resource ask cannot exceed the max. 495 if (amMemory > maxMem) { 496 LOG.info("AM memory specified above max threshold of cluster. Using max value." 497 + ", specified=" + amMemory 498 + ", max=" + maxMem); 499 amMemory = maxMem; 500 } 501 502 int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores(); 503 LOG.info("Max virtual cores capability of resources in this cluster " + maxVCores); 504 505 if (amVCores > maxVCores) { 506 LOG.info("AM virtual cores specified above max threshold of cluster. " 507 + "Using max value." + ", specified=" + amVCores 508 + ", max=" + maxVCores); 509 amVCores = maxVCores; 510 } 511 512 // set the application name 513 ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); 514 ApplicationId appId = appContext.getApplicationId(); 515 516 appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); 517 appContext.setApplicationName(appName); 518 519 if (attemptFailuresValidityInterval >= 0) { 520 appContext 521 .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); 522 } 523 524 // set local resources for the application master 525 // local files or archives as needed 526 // In this scenario, the jar file for the application master is part of the local resources 527 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 528 529 LOG.info("Copy App Master jar from local filesystem and add to local environment"); 530 // Copy the application master jar to the filesystem 531 // Create a local resource to point to the destination jar path 532 FileSystem fs = FileSystem.get(conf); 533 addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), 534 localResources, null); 535 536 // Set the log4j properties if needed 537 if (!log4jPropFile.isEmpty()) { 538 addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(), 539 localResources, null); 540 } 541 542 // The shell script has to be made available on the final container(s) 543 // where it will be executed. 544 // To do this, we need to first copy into the filesystem that is visible 545 // to the yarn framework. 546 // We do not need to set this as a local resource for the application 547 // master as the application master does not need it. 548 String hdfsShellScriptLocation = ""; 549 long hdfsShellScriptLen = 0; 550 long hdfsShellScriptTimestamp = 0; 551 if (!shellScriptPath.isEmpty()) { 552 Path shellSrc = new Path(shellScriptPath); 553 String shellPathSuffix = 554 appName + "/" + appId.toString() + "/" + SCRIPT_PATH; 555 Path shellDst = 556 new Path(fs.getHomeDirectory(), shellPathSuffix); 557 fs.copyFromLocalFile(false, true, shellSrc, shellDst); 558 hdfsShellScriptLocation = shellDst.toUri().toString(); 559 FileStatus shellFileStatus = fs.getFileStatus(shellDst); 560 hdfsShellScriptLen = shellFileStatus.getLen(); 561 hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); 562 } 563 564 if (!shellCommand.isEmpty()) { 565 addToLocalResources(fs, null, shellCommandPath, appId.toString(), 566 localResources, shellCommand); 567 } 568 569 if (shellArgs.length > 0) { 570 addToLocalResources(fs, null, shellArgsPath, appId.toString(), 571 localResources, StringUtils.join(shellArgs, " ")); 572 } 573 574 // Set the necessary security tokens as needed 575 //amContainer.setContainerTokens(containerToken); 576 577 // Set the env variables to be setup in the env where the application master will be run 578 LOG.info("Set the environment for the application master"); 579 Map<String, String> env = new HashMap<String, String>(); 580 581 // put location of shell script into env 582 // using the env info, the application master will create the correct local resource for the 583 // eventual containers that will be launched to execute the shell scripts 584 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); 585 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); 586 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); 587 if (domainId != null && domainId.length() > 0) { 588 env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId); 589 } 590 591 // Add AppMaster.jar location to classpath 592 // At some point we should not be required to add 593 // the hadoop specific classpaths to the env. 594 // It should be provided out of the box. 595 // For now setting all required classpaths including 596 // the classpath to "." for the application jar 597 StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$()) 598 .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*"); 599 for (String c : conf.getStrings( 600 YarnConfiguration.YARN_APPLICATION_CLASSPATH, 601 YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) { 602 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR); 603 classPathEnv.append(c.trim()); 604 } 605 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append( 606 "./log4j.properties"); 607 608 // add the runtime classpath needed for tests to work 609 if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { 610 classPathEnv.append(':'); 611 classPathEnv.append(System.getProperty("java.class.path")); 612 } 613 614 env.put("CLASSPATH", classPathEnv.toString()); 615 616 // Set the necessary command to execute the application master 617 Vector<CharSequence> vargs = new Vector<CharSequence>(30); 618 619 // Set java executable command 620 LOG.info("Setting up app master command"); 621 vargs.add(Environment.JAVA_HOME.$$() + "/bin/java"); 622 // Set Xmx based on am memory size 623 vargs.add("-Xmx" + amMemory + "m"); 624 // Set class name 625 vargs.add(appMasterMainClass); 626 // Set params for Application Master 627 vargs.add("--container_memory " + String.valueOf(containerMemory)); 628 vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); 629 vargs.add("--num_containers " + String.valueOf(numContainers)); 630 if (null != nodeLabelExpression) { 631 appContext.setNodeLabelExpression(nodeLabelExpression); 632 } 633 vargs.add("--priority " + String.valueOf(shellCmdPriority)); 634 635 for (Map.Entry<String, String> entry : shellEnv.entrySet()) { 636 vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); 637 } 638 if (debugFlag) { 639 vargs.add("--debug"); 640 } 641 642 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); 643 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); 644 645 // Get final commmand 646 StringBuilder command = new StringBuilder(); 647 for (CharSequence str : vargs) { 648 command.append(str).append(" "); 649 } 650 651 LOG.info("Completed setting up app master command " + command.toString()); 652 List<String> commands = new ArrayList<String>(); 653 commands.add(command.toString()); 654 655 // Set up the container launch context for the application master 656 ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( 657 localResources, env, commands, null, null, null); 658 659 // Set up resource type requirements 660 // For now, both memory and vcores are supported, so we set memory and 661 // vcores requirements 662 Resource capability = Resource.newInstance(amMemory, amVCores); 663 appContext.setResource(capability); 664 665 // Service data is a binary blob that can be passed to the application 666 // Not needed in this scenario 667 // amContainer.setServiceData(serviceData); 668 669 // Setup security tokens 670 if (UserGroupInformation.isSecurityEnabled()) { 671 // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce 672 Credentials credentials = new Credentials(); 673 String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); 674 if (tokenRenewer == null || tokenRenewer.length() == 0) { 675 throw new IOException( 676 "Can't get Master Kerberos principal for the RM to use as renewer"); 677 } 678 679 // For now, only getting tokens for the default file-system. 680 final Token<?> tokens[] = 681 fs.addDelegationTokens(tokenRenewer, credentials); 682 if (tokens != null) { 683 for (Token<?> token : tokens) { 684 LOG.info("Got dt for " + fs.getUri() + "; " + token); 685 } 686 } 687 DataOutputBuffer dob = new DataOutputBuffer(); 688 credentials.writeTokenStorageToStream(dob); 689 ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 690 amContainer.setTokens(fsTokens); 691 } 692 693 appContext.setAMContainerSpec(amContainer); 694 695 // Set the priority for the application master 696 // TODO - what is the range for priority? how to decide? 697 Priority pri = Priority.newInstance(amPriority); 698 appContext.setPriority(pri); 699 700 // Set the queue to which this application is to be submitted in the RM 701 appContext.setQueue(amQueue); 702 703 // Submit the application to the applications manager 704 // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); 705 // Ignore the response as either a valid response object is returned on success 706 // or an exception thrown to denote some form of a failure 707 LOG.info("Submitting application to ASM"); 708 709 yarnClient.submitApplication(appContext); 710 711 // TODO 712 // Try submitting the same request again 713 // app submission failure? 714 715 // Monitor the application 716 return monitorApplication(appId); 717 718 } 719 720 /** 721 * Monitor the submitted application for completion. 722 * Kill application if time expires. 723 * @param appId Application Id of application to be monitored 724 * @return true if application completed successfully 725 * @throws YarnException 726 * @throws IOException 727 */ 728 private boolean monitorApplication(ApplicationId appId) 729 throws YarnException, IOException { 730 731 while (true) { 732 733 // Check app status every 1 second. 734 try { 735 Thread.sleep(1000); 736 } catch (InterruptedException e) { 737 LOG.debug("Thread sleep in monitoring loop interrupted"); 738 } 739 740 // Get application report for the appId we are interested in 741 ApplicationReport report = yarnClient.getApplicationReport(appId); 742 743 LOG.info("Got application report from ASM for" 744 + ", appId=" + appId.getId() 745 + ", clientToAMToken=" + report.getClientToAMToken() 746 + ", appDiagnostics=" + report.getDiagnostics() 747 + ", appMasterHost=" + report.getHost() 748 + ", appQueue=" + report.getQueue() 749 + ", appMasterRpcPort=" + report.getRpcPort() 750 + ", appStartTime=" + report.getStartTime() 751 + ", yarnAppState=" + report.getYarnApplicationState().toString() 752 + ", distributedFinalState=" + report.getFinalApplicationStatus().toString() 753 + ", appTrackingUrl=" + report.getTrackingUrl() 754 + ", appUser=" + report.getUser()); 755 756 YarnApplicationState state = report.getYarnApplicationState(); 757 FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); 758 if (YarnApplicationState.FINISHED == state) { 759 if (FinalApplicationStatus.SUCCEEDED == dsStatus) { 760 LOG.info("Application has completed successfully. Breaking monitoring loop"); 761 return true; 762 } 763 else { 764 LOG.info("Application did finished unsuccessfully." 765 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() 766 + ". Breaking monitoring loop"); 767 return false; 768 } 769 } 770 else if (YarnApplicationState.KILLED == state 771 || YarnApplicationState.FAILED == state) { 772 LOG.info("Application did not finish." 773 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() 774 + ". Breaking monitoring loop"); 775 return false; 776 } 777 778 if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { 779 LOG.info("Reached client specified timeout for application. Killing application"); 780 forceKillApplication(appId); 781 return false; 782 } 783 } 784 785 } 786 787 /** 788 * Kill a submitted application by sending a call to the ASM 789 * @param appId Application Id to be killed. 790 * @throws YarnException 791 * @throws IOException 792 */ 793 private void forceKillApplication(ApplicationId appId) 794 throws YarnException, IOException { 795 // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 796 // the same time. 797 // If yes, can we kill a particular attempt only? 798 799 // Response can be ignored as it is non-null on success or 800 // throws an exception in case of failures 801 yarnClient.killApplication(appId); 802 } 803 804 private void addToLocalResources(FileSystem fs, String fileSrcPath, 805 String fileDstPath, String appId, Map<String, LocalResource> localResources, 806 String resources) throws IOException { 807 String suffix = 808 appName + "/" + appId + "/" + fileDstPath; 809 Path dst = 810 new Path(fs.getHomeDirectory(), suffix); 811 if (fileSrcPath == null) { 812 FSDataOutputStream ostream = null; 813 try { 814 ostream = FileSystem 815 .create(fs, dst, new FsPermission((short) 0710)); 816 ostream.writeUTF(resources); 817 } finally { 818 IOUtils.closeQuietly(ostream); 819 } 820 } else { 821 fs.copyFromLocalFile(new Path(fileSrcPath), dst); 822 } 823 FileStatus scFileStatus = fs.getFileStatus(dst); 824 LocalResource scRsrc = 825 LocalResource.newInstance( 826 URL.fromURI(dst.toUri()), 827 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 828 scFileStatus.getLen(), scFileStatus.getModificationTime()); 829 localResources.put(fileDstPath, scRsrc); 830 } 831 832 private void prepareTimelineDomain() { 833 TimelineClient timelineClient = null; 834 if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, 835 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { 836 timelineClient = TimelineClient.createTimelineClient(); 837 timelineClient.init(conf); 838 timelineClient.start(); 839 } else { 840 LOG.warn("Cannot put the domain " + domainId + 841 " because the timeline service is not enabled"); 842 return; 843 } 844 try { 845 //TODO: we need to check and combine the existing timeline domain ACLs, 846 //but let's do it once we have client java library to query domains. 847 TimelineDomain domain = new TimelineDomain(); 848 domain.setId(domainId); 849 domain.setReaders( 850 viewACLs != null && viewACLs.length() > 0 ? viewACLs : " "); 851 domain.setWriters( 852 modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " "); 853 timelineClient.putDomain(domain); 854 LOG.info("Put the timeline domain: " + 855 TimelineUtils.dumpTimelineRecordtoJSON(domain)); 856 } catch (Exception e) { 857 LOG.error("Error when putting the timeline domain", e); 858 } finally { 859 timelineClient.stop(); 860 } 861 } 862}