001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.applications.distributedshell; 020 021 import java.io.BufferedReader; 022 import java.io.DataInputStream; 023 import java.io.File; 024 import java.io.FileInputStream; 025 import java.io.IOException; 026 import java.io.StringReader; 027 import java.lang.reflect.UndeclaredThrowableException; 028 import java.net.URI; 029 import java.net.URISyntaxException; 030 import java.nio.ByteBuffer; 031 import java.security.PrivilegedExceptionAction; 032 import java.util.ArrayList; 033 import java.util.HashMap; 034 import java.util.Iterator; 035 import java.util.List; 036 import java.util.Map; 037 import java.util.Vector; 038 import java.util.concurrent.ConcurrentHashMap; 039 import java.util.concurrent.ConcurrentMap; 040 import java.util.concurrent.atomic.AtomicInteger; 041 042 import org.apache.commons.cli.CommandLine; 043 import org.apache.commons.cli.GnuParser; 044 import org.apache.commons.cli.HelpFormatter; 045 import org.apache.commons.cli.Options; 046 import org.apache.commons.cli.ParseException; 047 import org.apache.commons.logging.Log; 048 import org.apache.commons.logging.LogFactory; 049 import org.apache.hadoop.classification.InterfaceAudience; 050 import org.apache.hadoop.classification.InterfaceAudience.Private; 051 import org.apache.hadoop.classification.InterfaceStability; 052 import org.apache.hadoop.conf.Configuration; 053 import org.apache.hadoop.fs.FileSystem; 054 import org.apache.hadoop.fs.Path; 055 import org.apache.hadoop.io.DataOutputBuffer; 056 import org.apache.hadoop.io.IOUtils; 057 import org.apache.hadoop.net.NetUtils; 058 import org.apache.hadoop.security.Credentials; 059 import org.apache.hadoop.security.UserGroupInformation; 060 import org.apache.hadoop.security.token.Token; 061 import org.apache.hadoop.util.ExitUtil; 062 import org.apache.hadoop.util.Shell; 063 import org.apache.hadoop.yarn.api.ApplicationConstants; 064 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 065 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; 066 import org.apache.hadoop.yarn.api.ContainerManagementProtocol; 067 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; 068 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 069 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; 070 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 071 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 072 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 073 import org.apache.hadoop.yarn.api.records.Container; 074 import org.apache.hadoop.yarn.api.records.ContainerExitStatus; 075 import org.apache.hadoop.yarn.api.records.ContainerId; 076 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 077 import org.apache.hadoop.yarn.api.records.ContainerState; 078 import org.apache.hadoop.yarn.api.records.ContainerStatus; 079 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 080 import org.apache.hadoop.yarn.api.records.LocalResource; 081 import org.apache.hadoop.yarn.api.records.LocalResourceType; 082 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 083 import org.apache.hadoop.yarn.api.records.NodeReport; 084 import org.apache.hadoop.yarn.api.records.Priority; 085 import org.apache.hadoop.yarn.api.records.Resource; 086 import org.apache.hadoop.yarn.api.records.ResourceRequest; 087 import org.apache.hadoop.yarn.api.records.URL; 088 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; 089 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; 090 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; 091 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 092 import org.apache.hadoop.yarn.client.api.TimelineClient; 093 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; 094 import org.apache.hadoop.yarn.client.api.async.NMClientAsync; 095 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; 096 import org.apache.hadoop.yarn.conf.YarnConfiguration; 097 import org.apache.hadoop.yarn.exceptions.YarnException; 098 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 099 import org.apache.hadoop.yarn.util.ConverterUtils; 100 import org.apache.log4j.LogManager; 101 102 import com.google.common.annotations.VisibleForTesting; 103 104 /** 105 * An ApplicationMaster for executing shell commands on a set of launched 106 * containers using the YARN framework. 107 * 108 * <p> 109 * This class is meant to act as an example on how to write yarn-based 110 * application masters. 111 * </p> 112 * 113 * <p> 114 * The ApplicationMaster is started on a container by the 115 * <code>ResourceManager</code>'s launcher. The first thing that the 116 * <code>ApplicationMaster</code> needs to do is to connect and register itself 117 * with the <code>ResourceManager</code>. The registration sets up information 118 * within the <code>ResourceManager</code> regarding what host:port the 119 * ApplicationMaster is listening on to provide any form of functionality to a 120 * client as well as a tracking url that a client can use to keep track of 121 * status/job history if needed. However, in the distributedshell, trackingurl 122 * and appMasterHost:appMasterRpcPort are not supported. 123 * </p> 124 * 125 * <p> 126 * The <code>ApplicationMaster</code> needs to send a heartbeat to the 127 * <code>ResourceManager</code> at regular intervals to inform the 128 * <code>ResourceManager</code> that it is up and alive. The 129 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the 130 * <code>ApplicationMaster</code> acts as a heartbeat. 131 * 132 * <p> 133 * For the actual handling of the job, the <code>ApplicationMaster</code> has to 134 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the 135 * required no. of containers using {@link ResourceRequest} with the necessary 136 * resource specifications such as node location, computational 137 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code> 138 * responds with an {@link AllocateResponse} that informs the 139 * <code>ApplicationMaster</code> of the set of newly allocated containers, 140 * completed containers as well as current state of available resources. 141 * </p> 142 * 143 * <p> 144 * For each allocated container, the <code>ApplicationMaster</code> can then set 145 * up the necessary launch context via {@link ContainerLaunchContext} to specify 146 * the allocated container id, local resources required by the executable, the 147 * environment to be setup for the executable, commands to execute, etc. and 148 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to 149 * launch and execute the defined commands on the given allocated container. 150 * </p> 151 * 152 * <p> 153 * The <code>ApplicationMaster</code> can monitor the launched container by 154 * either querying the <code>ResourceManager</code> using 155 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via 156 * the {@link ContainerManagementProtocol} by querying for the status of the allocated 157 * container's {@link ContainerId}. 158 * 159 * <p> 160 * After the job has been completed, the <code>ApplicationMaster</code> has to 161 * send a {@link FinishApplicationMasterRequest} to the 162 * <code>ResourceManager</code> to inform it that the 163 * <code>ApplicationMaster</code> has been completed. 164 */ 165 @InterfaceAudience.Public 166 @InterfaceStability.Unstable 167 public class ApplicationMaster { 168 169 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); 170 171 @VisibleForTesting 172 @Private 173 public static enum DSEvent { 174 DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END 175 } 176 177 @VisibleForTesting 178 @Private 179 public static enum DSEntity { 180 DS_APP_ATTEMPT, DS_CONTAINER 181 } 182 183 // Configuration 184 private Configuration conf; 185 186 // Handle to communicate with the Resource Manager 187 @SuppressWarnings("rawtypes") 188 private AMRMClientAsync amRMClient; 189 190 // In both secure and non-secure modes, this points to the job-submitter. 191 private UserGroupInformation appSubmitterUgi; 192 193 // Handle to communicate with the Node Manager 194 private NMClientAsync nmClientAsync; 195 // Listen to process the response from the Node Manager 196 private NMCallbackHandler containerListener; 197 198 // Application Attempt Id ( combination of attemptId and fail count ) 199 @VisibleForTesting 200 protected ApplicationAttemptId appAttemptID; 201 202 // TODO 203 // For status update for clients - yet to be implemented 204 // Hostname of the container 205 private String appMasterHostname = ""; 206 // Port on which the app master listens for status updates from clients 207 private int appMasterRpcPort = -1; 208 // Tracking url to which app master publishes info for clients to monitor 209 private String appMasterTrackingUrl = ""; 210 211 // App Master configuration 212 // No. of containers to run shell command on 213 @VisibleForTesting 214 protected int numTotalContainers = 1; 215 // Memory to request for the container on which the shell command will run 216 private int containerMemory = 10; 217 // VirtualCores to request for the container on which the shell command will run 218 private int containerVirtualCores = 1; 219 // Priority of the request 220 private int requestPriority; 221 222 // Counter for completed containers ( complete denotes successful or failed ) 223 private AtomicInteger numCompletedContainers = new AtomicInteger(); 224 // Allocated container count so that we know how many containers has the RM 225 // allocated to us 226 @VisibleForTesting 227 protected AtomicInteger numAllocatedContainers = new AtomicInteger(); 228 // Count of failed containers 229 private AtomicInteger numFailedContainers = new AtomicInteger(); 230 // Count of containers already requested from the RM 231 // Needed as once requested, we should not request for containers again. 232 // Only request for more if the original requirement changes. 233 @VisibleForTesting 234 protected AtomicInteger numRequestedContainers = new AtomicInteger(); 235 236 // Shell command to be executed 237 private String shellCommand = ""; 238 // Args to be passed to the shell command 239 private String shellArgs = ""; 240 // Env variables to be setup for the shell command 241 private Map<String, String> shellEnv = new HashMap<String, String>(); 242 243 // Location of shell script ( obtained from info set in env ) 244 // Shell script path in fs 245 private String scriptPath = ""; 246 // Timestamp needed for creating a local resource 247 private long shellScriptPathTimestamp = 0; 248 // File length needed for local resource 249 private long shellScriptPathLen = 0; 250 251 // Timeline domain ID 252 private String domainId = null; 253 254 // Hardcoded path to shell script in launch container's local env 255 private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh"; 256 private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH 257 + ".bat"; 258 259 // Hardcoded path to custom log_properties 260 private static final String log4jPath = "log4j.properties"; 261 262 private static final String shellCommandPath = "shellCommands"; 263 private static final String shellArgsPath = "shellArgs"; 264 265 private volatile boolean done; 266 267 private ByteBuffer allTokens; 268 269 // Launch threads 270 private List<Thread> launchThreads = new ArrayList<Thread>(); 271 272 // Timeline Client 273 private TimelineClient timelineClient; 274 275 private final String linux_bash_command = "bash"; 276 private final String windows_command = "cmd /c"; 277 278 /** 279 * @param args Command line args 280 */ 281 public static void main(String[] args) { 282 boolean result = false; 283 try { 284 ApplicationMaster appMaster = new ApplicationMaster(); 285 LOG.info("Initializing ApplicationMaster"); 286 boolean doRun = appMaster.init(args); 287 if (!doRun) { 288 System.exit(0); 289 } 290 appMaster.run(); 291 result = appMaster.finish(); 292 } catch (Throwable t) { 293 LOG.fatal("Error running ApplicationMaster", t); 294 LogManager.shutdown(); 295 ExitUtil.terminate(1, t); 296 } 297 if (result) { 298 LOG.info("Application Master completed successfully. exiting"); 299 System.exit(0); 300 } else { 301 LOG.info("Application Master failed. exiting"); 302 System.exit(2); 303 } 304 } 305 306 /** 307 * Dump out contents of $CWD and the environment to stdout for debugging 308 */ 309 private void dumpOutDebugInfo() { 310 311 LOG.info("Dump debug output"); 312 Map<String, String> envs = System.getenv(); 313 for (Map.Entry<String, String> env : envs.entrySet()) { 314 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); 315 System.out.println("System env: key=" + env.getKey() + ", val=" 316 + env.getValue()); 317 } 318 319 BufferedReader buf = null; 320 try { 321 String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : 322 Shell.execCommand("ls", "-al"); 323 buf = new BufferedReader(new StringReader(lines)); 324 String line = ""; 325 while ((line = buf.readLine()) != null) { 326 LOG.info("System CWD content: " + line); 327 System.out.println("System CWD content: " + line); 328 } 329 } catch (IOException e) { 330 e.printStackTrace(); 331 } finally { 332 IOUtils.cleanup(LOG, buf); 333 } 334 } 335 336 public ApplicationMaster() { 337 // Set up the configuration 338 conf = new YarnConfiguration(); 339 } 340 341 /** 342 * Parse command line options 343 * 344 * @param args Command line args 345 * @return Whether init successful and run should be invoked 346 * @throws ParseException 347 * @throws IOException 348 */ 349 public boolean init(String[] args) throws ParseException, IOException { 350 Options opts = new Options(); 351 opts.addOption("app_attempt_id", true, 352 "App Attempt ID. Not to be used unless for testing purposes"); 353 opts.addOption("shell_env", true, 354 "Environment for shell script. Specified as env_key=env_val pairs"); 355 opts.addOption("container_memory", true, 356 "Amount of memory in MB to be requested to run the shell command"); 357 opts.addOption("container_vcores", true, 358 "Amount of virtual cores to be requested to run the shell command"); 359 opts.addOption("num_containers", true, 360 "No. of containers on which the shell command needs to be executed"); 361 opts.addOption("priority", true, "Application Priority. Default 0"); 362 opts.addOption("debug", false, "Dump out debug information"); 363 364 opts.addOption("help", false, "Print usage"); 365 CommandLine cliParser = new GnuParser().parse(opts, args); 366 367 if (args.length == 0) { 368 printUsage(opts); 369 throw new IllegalArgumentException( 370 "No args specified for application master to initialize"); 371 } 372 373 //Check whether customer log4j.properties file exists 374 if (fileExist(log4jPath)) { 375 try { 376 Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, 377 log4jPath); 378 } catch (Exception e) { 379 LOG.warn("Can not set up custom log4j properties. " + e); 380 } 381 } 382 383 if (cliParser.hasOption("help")) { 384 printUsage(opts); 385 return false; 386 } 387 388 if (cliParser.hasOption("debug")) { 389 dumpOutDebugInfo(); 390 } 391 392 Map<String, String> envs = System.getenv(); 393 394 if (!envs.containsKey(Environment.CONTAINER_ID.name())) { 395 if (cliParser.hasOption("app_attempt_id")) { 396 String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); 397 appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); 398 } else { 399 throw new IllegalArgumentException( 400 "Application Attempt Id not set in the environment"); 401 } 402 } else { 403 ContainerId containerId = ConverterUtils.toContainerId(envs 404 .get(Environment.CONTAINER_ID.name())); 405 appAttemptID = containerId.getApplicationAttemptId(); 406 } 407 408 if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { 409 throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV 410 + " not set in the environment"); 411 } 412 if (!envs.containsKey(Environment.NM_HOST.name())) { 413 throw new RuntimeException(Environment.NM_HOST.name() 414 + " not set in the environment"); 415 } 416 if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) { 417 throw new RuntimeException(Environment.NM_HTTP_PORT 418 + " not set in the environment"); 419 } 420 if (!envs.containsKey(Environment.NM_PORT.name())) { 421 throw new RuntimeException(Environment.NM_PORT.name() 422 + " not set in the environment"); 423 } 424 425 LOG.info("Application master for app" + ", appId=" 426 + appAttemptID.getApplicationId().getId() + ", clustertimestamp=" 427 + appAttemptID.getApplicationId().getClusterTimestamp() 428 + ", attemptId=" + appAttemptID.getAttemptId()); 429 430 if (!fileExist(shellCommandPath) 431 && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) { 432 throw new IllegalArgumentException( 433 "No shell command or shell script specified to be executed by application master"); 434 } 435 436 if (fileExist(shellCommandPath)) { 437 shellCommand = readContent(shellCommandPath); 438 } 439 440 if (fileExist(shellArgsPath)) { 441 shellArgs = readContent(shellArgsPath); 442 } 443 444 if (cliParser.hasOption("shell_env")) { 445 String shellEnvs[] = cliParser.getOptionValues("shell_env"); 446 for (String env : shellEnvs) { 447 env = env.trim(); 448 int index = env.indexOf('='); 449 if (index == -1) { 450 shellEnv.put(env, ""); 451 continue; 452 } 453 String key = env.substring(0, index); 454 String val = ""; 455 if (index < (env.length() - 1)) { 456 val = env.substring(index + 1); 457 } 458 shellEnv.put(key, val); 459 } 460 } 461 462 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { 463 scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); 464 465 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { 466 shellScriptPathTimestamp = Long.valueOf(envs 467 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); 468 } 469 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { 470 shellScriptPathLen = Long.valueOf(envs 471 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); 472 } 473 if (!scriptPath.isEmpty() 474 && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { 475 LOG.error("Illegal values in env for shell script path" + ", path=" 476 + scriptPath + ", len=" + shellScriptPathLen + ", timestamp=" 477 + shellScriptPathTimestamp); 478 throw new IllegalArgumentException( 479 "Illegal values in env for shell script path"); 480 } 481 } 482 483 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { 484 domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); 485 } 486 487 containerMemory = Integer.parseInt(cliParser.getOptionValue( 488 "container_memory", "10")); 489 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( 490 "container_vcores", "1")); 491 numTotalContainers = Integer.parseInt(cliParser.getOptionValue( 492 "num_containers", "1")); 493 if (numTotalContainers == 0) { 494 throw new IllegalArgumentException( 495 "Cannot run distributed shell with no containers"); 496 } 497 requestPriority = Integer.parseInt(cliParser 498 .getOptionValue("priority", "0")); 499 500 // Creating the Timeline Client 501 timelineClient = TimelineClient.createTimelineClient(); 502 timelineClient.init(conf); 503 timelineClient.start(); 504 505 return true; 506 } 507 508 /** 509 * Helper function to print usage 510 * 511 * @param opts Parsed command line options 512 */ 513 private void printUsage(Options opts) { 514 new HelpFormatter().printHelp("ApplicationMaster", opts); 515 } 516 517 /** 518 * Main run function for the application master 519 * 520 * @throws YarnException 521 * @throws IOException 522 */ 523 @SuppressWarnings({ "unchecked" }) 524 public void run() throws YarnException, IOException { 525 LOG.info("Starting ApplicationMaster"); 526 527 // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class 528 // are marked as LimitedPrivate 529 Credentials credentials = 530 UserGroupInformation.getCurrentUser().getCredentials(); 531 DataOutputBuffer dob = new DataOutputBuffer(); 532 credentials.writeTokenStorageToStream(dob); 533 // Now remove the AM->RM token so that containers cannot access it. 534 Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); 535 LOG.info("Executing with tokens:"); 536 while (iter.hasNext()) { 537 Token<?> token = iter.next(); 538 LOG.info(token); 539 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { 540 iter.remove(); 541 } 542 } 543 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 544 545 // Create appSubmitterUgi and add original tokens to it 546 String appSubmitterUserName = 547 System.getenv(ApplicationConstants.Environment.USER.name()); 548 appSubmitterUgi = 549 UserGroupInformation.createRemoteUser(appSubmitterUserName); 550 appSubmitterUgi.addCredentials(credentials); 551 552 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 553 DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); 554 555 AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); 556 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); 557 amRMClient.init(conf); 558 amRMClient.start(); 559 560 containerListener = createNMCallbackHandler(); 561 nmClientAsync = new NMClientAsyncImpl(containerListener); 562 nmClientAsync.init(conf); 563 nmClientAsync.start(); 564 565 // Setup local RPC Server to accept status requests directly from clients 566 // TODO need to setup a protocol for client to be able to communicate to 567 // the RPC server 568 // TODO use the rpc port info to register with the RM for the client to 569 // send requests to this app master 570 571 // Register self with ResourceManager 572 // This will start heartbeating to the RM 573 appMasterHostname = NetUtils.getHostname(); 574 RegisterApplicationMasterResponse response = amRMClient 575 .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 576 appMasterTrackingUrl); 577 // Dump out information about cluster capability as seen by the 578 // resource manager 579 int maxMem = response.getMaximumResourceCapability().getMemory(); 580 LOG.info("Max mem capabililty of resources in this cluster " + maxMem); 581 582 int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); 583 LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores); 584 585 // A resource ask cannot exceed the max. 586 if (containerMemory > maxMem) { 587 LOG.info("Container memory specified above max threshold of cluster." 588 + " Using max value." + ", specified=" + containerMemory + ", max=" 589 + maxMem); 590 containerMemory = maxMem; 591 } 592 593 if (containerVirtualCores > maxVCores) { 594 LOG.info("Container virtual cores specified above max threshold of cluster." 595 + " Using max value." + ", specified=" + containerVirtualCores + ", max=" 596 + maxVCores); 597 containerVirtualCores = maxVCores; 598 } 599 600 List<Container> previousAMRunningContainers = 601 response.getContainersFromPreviousAttempts(); 602 LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() 603 + " previous attempts' running containers on AM registration."); 604 numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); 605 606 int numTotalContainersToRequest = 607 numTotalContainers - previousAMRunningContainers.size(); 608 // Setup ask for containers from RM 609 // Send request for containers to RM 610 // Until we get our fully allocated quota, we keep on polling RM for 611 // containers 612 // Keep looping until all the containers are launched and shell script 613 // executed on them ( regardless of success/failure). 614 for (int i = 0; i < numTotalContainersToRequest; ++i) { 615 ContainerRequest containerAsk = setupContainerAskForRM(); 616 amRMClient.addContainerRequest(containerAsk); 617 } 618 numRequestedContainers.set(numTotalContainers); 619 620 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 621 DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); 622 } 623 624 @VisibleForTesting 625 NMCallbackHandler createNMCallbackHandler() { 626 return new NMCallbackHandler(this); 627 } 628 629 @VisibleForTesting 630 protected boolean finish() { 631 // wait for completion. 632 while (!done 633 && (numCompletedContainers.get() != numTotalContainers)) { 634 try { 635 Thread.sleep(200); 636 } catch (InterruptedException ex) {} 637 } 638 639 // Join all launched threads 640 // needed for when we time out 641 // and we need to release containers 642 for (Thread launchThread : launchThreads) { 643 try { 644 launchThread.join(10000); 645 } catch (InterruptedException e) { 646 LOG.info("Exception thrown in thread join: " + e.getMessage()); 647 e.printStackTrace(); 648 } 649 } 650 651 // When the application completes, it should stop all running containers 652 LOG.info("Application completed. Stopping running containers"); 653 nmClientAsync.stop(); 654 655 // When the application completes, it should send a finish application 656 // signal to the RM 657 LOG.info("Application completed. Signalling finish to RM"); 658 659 FinalApplicationStatus appStatus; 660 String appMessage = null; 661 boolean success = true; 662 if (numFailedContainers.get() == 0 && 663 numCompletedContainers.get() == numTotalContainers) { 664 appStatus = FinalApplicationStatus.SUCCEEDED; 665 } else { 666 appStatus = FinalApplicationStatus.FAILED; 667 appMessage = "Diagnostics." + ", total=" + numTotalContainers 668 + ", completed=" + numCompletedContainers.get() + ", allocated=" 669 + numAllocatedContainers.get() + ", failed=" 670 + numFailedContainers.get(); 671 LOG.info(appMessage); 672 success = false; 673 } 674 try { 675 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); 676 } catch (YarnException ex) { 677 LOG.error("Failed to unregister application", ex); 678 } catch (IOException e) { 679 LOG.error("Failed to unregister application", e); 680 } 681 682 amRMClient.stop(); 683 684 return success; 685 } 686 687 private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { 688 @SuppressWarnings("unchecked") 689 @Override 690 public void onContainersCompleted(List<ContainerStatus> completedContainers) { 691 LOG.info("Got response from RM for container ask, completedCnt=" 692 + completedContainers.size()); 693 for (ContainerStatus containerStatus : completedContainers) { 694 LOG.info(appAttemptID + " got container status for containerID=" 695 + containerStatus.getContainerId() + ", state=" 696 + containerStatus.getState() + ", exitStatus=" 697 + containerStatus.getExitStatus() + ", diagnostics=" 698 + containerStatus.getDiagnostics()); 699 700 // non complete containers should not be here 701 assert (containerStatus.getState() == ContainerState.COMPLETE); 702 703 // increment counters for completed/failed containers 704 int exitStatus = containerStatus.getExitStatus(); 705 if (0 != exitStatus) { 706 // container failed 707 if (ContainerExitStatus.ABORTED != exitStatus) { 708 // shell script failed 709 // counts as completed 710 numCompletedContainers.incrementAndGet(); 711 numFailedContainers.incrementAndGet(); 712 } else { 713 // container was killed by framework, possibly preempted 714 // we should re-try as the container was lost for some reason 715 numAllocatedContainers.decrementAndGet(); 716 numRequestedContainers.decrementAndGet(); 717 // we do not need to release the container as it would be done 718 // by the RM 719 } 720 } else { 721 // nothing to do 722 // container completed successfully 723 numCompletedContainers.incrementAndGet(); 724 LOG.info("Container completed successfully." + ", containerId=" 725 + containerStatus.getContainerId()); 726 } 727 publishContainerEndEvent( 728 timelineClient, containerStatus, domainId, appSubmitterUgi); 729 } 730 731 // ask for more containers if any failed 732 int askCount = numTotalContainers - numRequestedContainers.get(); 733 numRequestedContainers.addAndGet(askCount); 734 735 if (askCount > 0) { 736 for (int i = 0; i < askCount; ++i) { 737 ContainerRequest containerAsk = setupContainerAskForRM(); 738 amRMClient.addContainerRequest(containerAsk); 739 } 740 } 741 742 if (numCompletedContainers.get() == numTotalContainers) { 743 done = true; 744 } 745 } 746 747 @Override 748 public void onContainersAllocated(List<Container> allocatedContainers) { 749 LOG.info("Got response from RM for container ask, allocatedCnt=" 750 + allocatedContainers.size()); 751 numAllocatedContainers.addAndGet(allocatedContainers.size()); 752 for (Container allocatedContainer : allocatedContainers) { 753 LOG.info("Launching shell command on a new container." 754 + ", containerId=" + allocatedContainer.getId() 755 + ", containerNode=" + allocatedContainer.getNodeId().getHost() 756 + ":" + allocatedContainer.getNodeId().getPort() 757 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() 758 + ", containerResourceMemory" 759 + allocatedContainer.getResource().getMemory() 760 + ", containerResourceVirtualCores" 761 + allocatedContainer.getResource().getVirtualCores()); 762 // + ", containerToken" 763 // +allocatedContainer.getContainerToken().getIdentifier().toString()); 764 765 LaunchContainerRunnable runnableLaunchContainer = 766 new LaunchContainerRunnable(allocatedContainer, containerListener); 767 Thread launchThread = new Thread(runnableLaunchContainer); 768 769 // launch and start the container on a separate thread to keep 770 // the main thread unblocked 771 // as all containers may not be allocated at one go. 772 launchThreads.add(launchThread); 773 launchThread.start(); 774 } 775 } 776 777 @Override 778 public void onShutdownRequest() { 779 done = true; 780 } 781 782 @Override 783 public void onNodesUpdated(List<NodeReport> updatedNodes) {} 784 785 @Override 786 public float getProgress() { 787 // set progress to deliver to RM on next heartbeat 788 float progress = (float) numCompletedContainers.get() 789 / numTotalContainers; 790 return progress; 791 } 792 793 @Override 794 public void onError(Throwable e) { 795 done = true; 796 amRMClient.stop(); 797 } 798 } 799 800 @VisibleForTesting 801 static class NMCallbackHandler 802 implements NMClientAsync.CallbackHandler { 803 804 private ConcurrentMap<ContainerId, Container> containers = 805 new ConcurrentHashMap<ContainerId, Container>(); 806 private final ApplicationMaster applicationMaster; 807 808 public NMCallbackHandler(ApplicationMaster applicationMaster) { 809 this.applicationMaster = applicationMaster; 810 } 811 812 public void addContainer(ContainerId containerId, Container container) { 813 containers.putIfAbsent(containerId, container); 814 } 815 816 @Override 817 public void onContainerStopped(ContainerId containerId) { 818 if (LOG.isDebugEnabled()) { 819 LOG.debug("Succeeded to stop Container " + containerId); 820 } 821 containers.remove(containerId); 822 } 823 824 @Override 825 public void onContainerStatusReceived(ContainerId containerId, 826 ContainerStatus containerStatus) { 827 if (LOG.isDebugEnabled()) { 828 LOG.debug("Container Status: id=" + containerId + ", status=" + 829 containerStatus); 830 } 831 } 832 833 @Override 834 public void onContainerStarted(ContainerId containerId, 835 Map<String, ByteBuffer> allServiceResponse) { 836 if (LOG.isDebugEnabled()) { 837 LOG.debug("Succeeded to start Container " + containerId); 838 } 839 Container container = containers.get(containerId); 840 if (container != null) { 841 applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); 842 } 843 ApplicationMaster.publishContainerStartEvent( 844 applicationMaster.timelineClient, container, 845 applicationMaster.domainId, applicationMaster.appSubmitterUgi); 846 } 847 848 @Override 849 public void onStartContainerError(ContainerId containerId, Throwable t) { 850 LOG.error("Failed to start Container " + containerId); 851 containers.remove(containerId); 852 applicationMaster.numCompletedContainers.incrementAndGet(); 853 applicationMaster.numFailedContainers.incrementAndGet(); 854 } 855 856 @Override 857 public void onGetContainerStatusError( 858 ContainerId containerId, Throwable t) { 859 LOG.error("Failed to query the status of Container " + containerId); 860 } 861 862 @Override 863 public void onStopContainerError(ContainerId containerId, Throwable t) { 864 LOG.error("Failed to stop Container " + containerId); 865 containers.remove(containerId); 866 } 867 } 868 869 /** 870 * Thread to connect to the {@link ContainerManagementProtocol} and launch the container 871 * that will execute the shell command. 872 */ 873 private class LaunchContainerRunnable implements Runnable { 874 875 // Allocated container 876 Container container; 877 878 NMCallbackHandler containerListener; 879 880 /** 881 * @param lcontainer Allocated container 882 * @param containerListener Callback handler of the container 883 */ 884 public LaunchContainerRunnable( 885 Container lcontainer, NMCallbackHandler containerListener) { 886 this.container = lcontainer; 887 this.containerListener = containerListener; 888 } 889 890 @Override 891 /** 892 * Connects to CM, sets up container launch context 893 * for shell command and eventually dispatches the container 894 * start request to the CM. 895 */ 896 public void run() { 897 LOG.info("Setting up container launch container for containerid=" 898 + container.getId()); 899 900 // Set the local resources 901 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 902 903 // The container for the eventual shell commands needs its own local 904 // resources too. 905 // In this scenario, if a shell script is specified, we need to have it 906 // copied and made available to the container. 907 if (!scriptPath.isEmpty()) { 908 Path renamedScriptPath = null; 909 if (Shell.WINDOWS) { 910 renamedScriptPath = new Path(scriptPath + ".bat"); 911 } else { 912 renamedScriptPath = new Path(scriptPath + ".sh"); 913 } 914 915 try { 916 // rename the script file based on the underlying OS syntax. 917 renameScriptFile(renamedScriptPath); 918 } catch (Exception e) { 919 LOG.error( 920 "Not able to add suffix (.bat/.sh) to the shell script filename", 921 e); 922 // We know we cannot continue launching the container 923 // so we should release it. 924 numCompletedContainers.incrementAndGet(); 925 numFailedContainers.incrementAndGet(); 926 return; 927 } 928 929 URL yarnUrl = null; 930 try { 931 yarnUrl = ConverterUtils.getYarnUrlFromURI( 932 new URI(renamedScriptPath.toString())); 933 } catch (URISyntaxException e) { 934 LOG.error("Error when trying to use shell script path specified" 935 + " in env, path=" + renamedScriptPath, e); 936 // A failure scenario on bad input such as invalid shell script path 937 // We know we cannot continue launching the container 938 // so we should release it. 939 // TODO 940 numCompletedContainers.incrementAndGet(); 941 numFailedContainers.incrementAndGet(); 942 return; 943 } 944 LocalResource shellRsrc = LocalResource.newInstance(yarnUrl, 945 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 946 shellScriptPathLen, shellScriptPathTimestamp); 947 localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath : 948 ExecShellStringPath, shellRsrc); 949 shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; 950 } 951 952 // Set the necessary command to execute on the allocated container 953 Vector<CharSequence> vargs = new Vector<CharSequence>(5); 954 955 // Set executable command 956 vargs.add(shellCommand); 957 // Set shell script path 958 if (!scriptPath.isEmpty()) { 959 vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath 960 : ExecShellStringPath); 961 } 962 963 // Set args for the shell command if any 964 vargs.add(shellArgs); 965 // Add log redirect params 966 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); 967 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); 968 969 // Get final commmand 970 StringBuilder command = new StringBuilder(); 971 for (CharSequence str : vargs) { 972 command.append(str).append(" "); 973 } 974 975 List<String> commands = new ArrayList<String>(); 976 commands.add(command.toString()); 977 978 // Set up ContainerLaunchContext, setting local resource, environment, 979 // command and token for constructor. 980 981 // Note for tokens: Set up tokens for the container too. Today, for normal 982 // shell commands, the container in distribute-shell doesn't need any 983 // tokens. We are populating them mainly for NodeManagers to be able to 984 // download anyfiles in the distributed file-system. The tokens are 985 // otherwise also useful in cases, for e.g., when one is running a 986 // "hadoop dfs" command inside the distributed shell. 987 ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( 988 localResources, shellEnv, commands, null, allTokens.duplicate(), null); 989 containerListener.addContainer(container.getId(), container); 990 nmClientAsync.startContainerAsync(container, ctx); 991 } 992 } 993 994 private void renameScriptFile(final Path renamedScriptPath) 995 throws IOException, InterruptedException { 996 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 997 @Override 998 public Void run() throws IOException { 999 FileSystem fs = renamedScriptPath.getFileSystem(conf); 1000 fs.rename(new Path(scriptPath), renamedScriptPath); 1001 return null; 1002 } 1003 }); 1004 LOG.info("User " + appSubmitterUgi.getUserName() 1005 + " added suffix(.sh/.bat) to script file as " + renamedScriptPath); 1006 } 1007 1008 /** 1009 * Setup the request that will be sent to the RM for the container ask. 1010 * 1011 * @return the setup ResourceRequest to be sent to RM 1012 */ 1013 private ContainerRequest setupContainerAskForRM() { 1014 // setup requirements for hosts 1015 // using * as any host will do for the distributed shell app 1016 // set the priority for the request 1017 // TODO - what is the range for priority? how to decide? 1018 Priority pri = Priority.newInstance(requestPriority); 1019 1020 // Set up resource type requirements 1021 // For now, memory and CPU are supported so we set memory and cpu requirements 1022 Resource capability = Resource.newInstance(containerMemory, 1023 containerVirtualCores); 1024 1025 ContainerRequest request = new ContainerRequest(capability, null, null, 1026 pri); 1027 LOG.info("Requested container ask: " + request.toString()); 1028 return request; 1029 } 1030 1031 private boolean fileExist(String filePath) { 1032 return new File(filePath).exists(); 1033 } 1034 1035 private String readContent(String filePath) throws IOException { 1036 DataInputStream ds = null; 1037 try { 1038 ds = new DataInputStream(new FileInputStream(filePath)); 1039 return ds.readUTF(); 1040 } finally { 1041 org.apache.commons.io.IOUtils.closeQuietly(ds); 1042 } 1043 } 1044 1045 private static void publishContainerStartEvent( 1046 final TimelineClient timelineClient, Container container, String domainId, 1047 UserGroupInformation ugi) { 1048 final TimelineEntity entity = new TimelineEntity(); 1049 entity.setEntityId(container.getId().toString()); 1050 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1051 entity.setDomainId(domainId); 1052 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1053 TimelineEvent event = new TimelineEvent(); 1054 event.setTimestamp(System.currentTimeMillis()); 1055 event.setEventType(DSEvent.DS_CONTAINER_START.toString()); 1056 event.addEventInfo("Node", container.getNodeId().toString()); 1057 event.addEventInfo("Resources", container.getResource().toString()); 1058 entity.addEvent(event); 1059 1060 try { 1061 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { 1062 @Override 1063 public TimelinePutResponse run() throws Exception { 1064 return timelineClient.putEntities(entity); 1065 } 1066 }); 1067 } catch (Exception e) { 1068 LOG.error("Container start event could not be published for " 1069 + container.getId().toString(), 1070 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1071 } 1072 } 1073 1074 private static void publishContainerEndEvent( 1075 final TimelineClient timelineClient, ContainerStatus container, 1076 String domainId, UserGroupInformation ugi) { 1077 final TimelineEntity entity = new TimelineEntity(); 1078 entity.setEntityId(container.getContainerId().toString()); 1079 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1080 entity.setDomainId(domainId); 1081 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1082 TimelineEvent event = new TimelineEvent(); 1083 event.setTimestamp(System.currentTimeMillis()); 1084 event.setEventType(DSEvent.DS_CONTAINER_END.toString()); 1085 event.addEventInfo("State", container.getState().name()); 1086 event.addEventInfo("Exit Status", container.getExitStatus()); 1087 entity.addEvent(event); 1088 1089 try { 1090 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { 1091 @Override 1092 public TimelinePutResponse run() throws Exception { 1093 return timelineClient.putEntities(entity); 1094 } 1095 }); 1096 } catch (Exception e) { 1097 LOG.error("Container end event could not be published for " 1098 + container.getContainerId().toString(), 1099 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1100 } 1101 } 1102 1103 private static void publishApplicationAttemptEvent( 1104 final TimelineClient timelineClient, String appAttemptId, 1105 DSEvent appEvent, String domainId, UserGroupInformation ugi) { 1106 final TimelineEntity entity = new TimelineEntity(); 1107 entity.setEntityId(appAttemptId); 1108 entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); 1109 entity.setDomainId(domainId); 1110 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1111 TimelineEvent event = new TimelineEvent(); 1112 event.setEventType(appEvent.toString()); 1113 event.setTimestamp(System.currentTimeMillis()); 1114 entity.addEvent(event); 1115 1116 try { 1117 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { 1118 @Override 1119 public TimelinePutResponse run() throws Exception { 1120 return timelineClient.putEntities(entity); 1121 } 1122 }); 1123 } catch (Exception e) { 1124 LOG.error("App Attempt " 1125 + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") 1126 + " event could not be published for " 1127 + appAttemptId.toString(), 1128 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1129 } 1130 } 1131 }