001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.applications.distributedshell; 020 021import java.io.BufferedReader; 022import java.io.DataInputStream; 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.io.StringReader; 027import java.lang.reflect.UndeclaredThrowableException; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.nio.ByteBuffer; 031import java.security.PrivilegedExceptionAction; 032import java.util.ArrayList; 033import java.util.HashMap; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Vector; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.atomic.AtomicInteger; 041 042import org.apache.commons.cli.CommandLine; 043import org.apache.commons.cli.GnuParser; 044import org.apache.commons.cli.HelpFormatter; 045import org.apache.commons.cli.Options; 046import org.apache.commons.cli.ParseException; 047import org.apache.commons.logging.Log; 048import org.apache.commons.logging.LogFactory; 049import org.apache.hadoop.classification.InterfaceAudience; 050import org.apache.hadoop.classification.InterfaceAudience.Private; 051import org.apache.hadoop.classification.InterfaceStability; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.fs.FileSystem; 054import org.apache.hadoop.fs.Path; 055import org.apache.hadoop.io.DataOutputBuffer; 056import org.apache.hadoop.io.IOUtils; 057import org.apache.hadoop.net.NetUtils; 058import org.apache.hadoop.security.Credentials; 059import org.apache.hadoop.security.UserGroupInformation; 060import org.apache.hadoop.security.token.Token; 061import org.apache.hadoop.util.ExitUtil; 062import org.apache.hadoop.util.Shell; 063import org.apache.hadoop.yarn.api.ApplicationConstants; 064import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 065import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; 066import org.apache.hadoop.yarn.api.ContainerManagementProtocol; 067import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; 068import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 069import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; 070import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 071import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 072import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 073import org.apache.hadoop.yarn.api.records.Container; 074import org.apache.hadoop.yarn.api.records.ContainerExitStatus; 075import org.apache.hadoop.yarn.api.records.ContainerId; 076import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 077import org.apache.hadoop.yarn.api.records.ContainerState; 078import org.apache.hadoop.yarn.api.records.ContainerStatus; 079import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 080import org.apache.hadoop.yarn.api.records.LocalResource; 081import org.apache.hadoop.yarn.api.records.LocalResourceType; 082import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 083import org.apache.hadoop.yarn.api.records.NodeReport; 084import org.apache.hadoop.yarn.api.records.Priority; 085import org.apache.hadoop.yarn.api.records.Resource; 086import org.apache.hadoop.yarn.api.records.ResourceRequest; 087import org.apache.hadoop.yarn.api.records.URL; 088import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; 089import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; 090import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; 091import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 092import org.apache.hadoop.yarn.client.api.TimelineClient; 093import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; 094import org.apache.hadoop.yarn.client.api.async.NMClientAsync; 095import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; 096import org.apache.hadoop.yarn.conf.YarnConfiguration; 097import org.apache.hadoop.yarn.exceptions.YarnException; 098import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 099import org.apache.hadoop.yarn.util.ConverterUtils; 100import org.apache.log4j.LogManager; 101 102import com.google.common.annotations.VisibleForTesting; 103 104/** 105 * An ApplicationMaster for executing shell commands on a set of launched 106 * containers using the YARN framework. 107 * 108 * <p> 109 * This class is meant to act as an example on how to write yarn-based 110 * application masters. 111 * </p> 112 * 113 * <p> 114 * The ApplicationMaster is started on a container by the 115 * <code>ResourceManager</code>'s launcher. The first thing that the 116 * <code>ApplicationMaster</code> needs to do is to connect and register itself 117 * with the <code>ResourceManager</code>. The registration sets up information 118 * within the <code>ResourceManager</code> regarding what host:port the 119 * ApplicationMaster is listening on to provide any form of functionality to a 120 * client as well as a tracking url that a client can use to keep track of 121 * status/job history if needed. However, in the distributedshell, trackingurl 122 * and appMasterHost:appMasterRpcPort are not supported. 123 * </p> 124 * 125 * <p> 126 * The <code>ApplicationMaster</code> needs to send a heartbeat to the 127 * <code>ResourceManager</code> at regular intervals to inform the 128 * <code>ResourceManager</code> that it is up and alive. The 129 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the 130 * <code>ApplicationMaster</code> acts as a heartbeat. 131 * 132 * <p> 133 * For the actual handling of the job, the <code>ApplicationMaster</code> has to 134 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the 135 * required no. of containers using {@link ResourceRequest} with the necessary 136 * resource specifications such as node location, computational 137 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code> 138 * responds with an {@link AllocateResponse} that informs the 139 * <code>ApplicationMaster</code> of the set of newly allocated containers, 140 * completed containers as well as current state of available resources. 141 * </p> 142 * 143 * <p> 144 * For each allocated container, the <code>ApplicationMaster</code> can then set 145 * up the necessary launch context via {@link ContainerLaunchContext} to specify 146 * the allocated container id, local resources required by the executable, the 147 * environment to be setup for the executable, commands to execute, etc. and 148 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to 149 * launch and execute the defined commands on the given allocated container. 150 * </p> 151 * 152 * <p> 153 * The <code>ApplicationMaster</code> can monitor the launched container by 154 * either querying the <code>ResourceManager</code> using 155 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via 156 * the {@link ContainerManagementProtocol} by querying for the status of the allocated 157 * container's {@link ContainerId}. 158 * 159 * <p> 160 * After the job has been completed, the <code>ApplicationMaster</code> has to 161 * send a {@link FinishApplicationMasterRequest} to the 162 * <code>ResourceManager</code> to inform it that the 163 * <code>ApplicationMaster</code> has been completed. 164 */ 165@InterfaceAudience.Public 166@InterfaceStability.Unstable 167public class ApplicationMaster { 168 169 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); 170 171 @VisibleForTesting 172 @Private 173 public static enum DSEvent { 174 DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END 175 } 176 177 @VisibleForTesting 178 @Private 179 public static enum DSEntity { 180 DS_APP_ATTEMPT, DS_CONTAINER 181 } 182 183 // Configuration 184 private Configuration conf; 185 186 // Handle to communicate with the Resource Manager 187 @SuppressWarnings("rawtypes") 188 private AMRMClientAsync amRMClient; 189 190 // In both secure and non-secure modes, this points to the job-submitter. 191 private UserGroupInformation appSubmitterUgi; 192 193 // Handle to communicate with the Node Manager 194 private NMClientAsync nmClientAsync; 195 // Listen to process the response from the Node Manager 196 private NMCallbackHandler containerListener; 197 198 // Application Attempt Id ( combination of attemptId and fail count ) 199 @VisibleForTesting 200 protected ApplicationAttemptId appAttemptID; 201 202 // TODO 203 // For status update for clients - yet to be implemented 204 // Hostname of the container 205 private String appMasterHostname = ""; 206 // Port on which the app master listens for status updates from clients 207 private int appMasterRpcPort = -1; 208 // Tracking url to which app master publishes info for clients to monitor 209 private String appMasterTrackingUrl = ""; 210 211 // App Master configuration 212 // No. of containers to run shell command on 213 @VisibleForTesting 214 protected int numTotalContainers = 1; 215 // Memory to request for the container on which the shell command will run 216 private int containerMemory = 10; 217 // VirtualCores to request for the container on which the shell command will run 218 private int containerVirtualCores = 1; 219 // Priority of the request 220 private int requestPriority; 221 222 // Counter for completed containers ( complete denotes successful or failed ) 223 private AtomicInteger numCompletedContainers = new AtomicInteger(); 224 // Allocated container count so that we know how many containers has the RM 225 // allocated to us 226 @VisibleForTesting 227 protected AtomicInteger numAllocatedContainers = new AtomicInteger(); 228 // Count of failed containers 229 private AtomicInteger numFailedContainers = new AtomicInteger(); 230 // Count of containers already requested from the RM 231 // Needed as once requested, we should not request for containers again. 232 // Only request for more if the original requirement changes. 233 @VisibleForTesting 234 protected AtomicInteger numRequestedContainers = new AtomicInteger(); 235 236 // Shell command to be executed 237 private String shellCommand = ""; 238 // Args to be passed to the shell command 239 private String shellArgs = ""; 240 // Env variables to be setup for the shell command 241 private Map<String, String> shellEnv = new HashMap<String, String>(); 242 243 // Location of shell script ( obtained from info set in env ) 244 // Shell script path in fs 245 private String scriptPath = ""; 246 // Timestamp needed for creating a local resource 247 private long shellScriptPathTimestamp = 0; 248 // File length needed for local resource 249 private long shellScriptPathLen = 0; 250 251 // Timeline domain ID 252 private String domainId = null; 253 254 // Hardcoded path to shell script in launch container's local env 255 private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh"; 256 private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH 257 + ".bat"; 258 259 // Hardcoded path to custom log_properties 260 private static final String log4jPath = "log4j.properties"; 261 262 private static final String shellCommandPath = "shellCommands"; 263 private static final String shellArgsPath = "shellArgs"; 264 265 private volatile boolean done; 266 267 private ByteBuffer allTokens; 268 269 // Launch threads 270 private List<Thread> launchThreads = new ArrayList<Thread>(); 271 272 // Timeline Client 273 private TimelineClient timelineClient; 274 275 private final String linux_bash_command = "bash"; 276 private final String windows_command = "cmd /c"; 277 278 /** 279 * @param args Command line args 280 */ 281 public static void main(String[] args) { 282 boolean result = false; 283 try { 284 ApplicationMaster appMaster = new ApplicationMaster(); 285 LOG.info("Initializing ApplicationMaster"); 286 boolean doRun = appMaster.init(args); 287 if (!doRun) { 288 System.exit(0); 289 } 290 appMaster.run(); 291 result = appMaster.finish(); 292 } catch (Throwable t) { 293 LOG.fatal("Error running ApplicationMaster", t); 294 LogManager.shutdown(); 295 ExitUtil.terminate(1, t); 296 } 297 if (result) { 298 LOG.info("Application Master completed successfully. exiting"); 299 System.exit(0); 300 } else { 301 LOG.info("Application Master failed. exiting"); 302 System.exit(2); 303 } 304 } 305 306 /** 307 * Dump out contents of $CWD and the environment to stdout for debugging 308 */ 309 private void dumpOutDebugInfo() { 310 311 LOG.info("Dump debug output"); 312 Map<String, String> envs = System.getenv(); 313 for (Map.Entry<String, String> env : envs.entrySet()) { 314 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); 315 System.out.println("System env: key=" + env.getKey() + ", val=" 316 + env.getValue()); 317 } 318 319 BufferedReader buf = null; 320 try { 321 String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : 322 Shell.execCommand("ls", "-al"); 323 buf = new BufferedReader(new StringReader(lines)); 324 String line = ""; 325 while ((line = buf.readLine()) != null) { 326 LOG.info("System CWD content: " + line); 327 System.out.println("System CWD content: " + line); 328 } 329 } catch (IOException e) { 330 e.printStackTrace(); 331 } finally { 332 IOUtils.cleanup(LOG, buf); 333 } 334 } 335 336 public ApplicationMaster() { 337 // Set up the configuration 338 conf = new YarnConfiguration(); 339 } 340 341 /** 342 * Parse command line options 343 * 344 * @param args Command line args 345 * @return Whether init successful and run should be invoked 346 * @throws ParseException 347 * @throws IOException 348 */ 349 public boolean init(String[] args) throws ParseException, IOException { 350 Options opts = new Options(); 351 opts.addOption("app_attempt_id", true, 352 "App Attempt ID. Not to be used unless for testing purposes"); 353 opts.addOption("shell_env", true, 354 "Environment for shell script. Specified as env_key=env_val pairs"); 355 opts.addOption("container_memory", true, 356 "Amount of memory in MB to be requested to run the shell command"); 357 opts.addOption("container_vcores", true, 358 "Amount of virtual cores to be requested to run the shell command"); 359 opts.addOption("num_containers", true, 360 "No. of containers on which the shell command needs to be executed"); 361 opts.addOption("priority", true, "Application Priority. Default 0"); 362 opts.addOption("debug", false, "Dump out debug information"); 363 364 opts.addOption("help", false, "Print usage"); 365 CommandLine cliParser = new GnuParser().parse(opts, args); 366 367 if (args.length == 0) { 368 printUsage(opts); 369 throw new IllegalArgumentException( 370 "No args specified for application master to initialize"); 371 } 372 373 //Check whether customer log4j.properties file exists 374 if (fileExist(log4jPath)) { 375 try { 376 Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, 377 log4jPath); 378 } catch (Exception e) { 379 LOG.warn("Can not set up custom log4j properties. " + e); 380 } 381 } 382 383 if (cliParser.hasOption("help")) { 384 printUsage(opts); 385 return false; 386 } 387 388 if (cliParser.hasOption("debug")) { 389 dumpOutDebugInfo(); 390 } 391 392 Map<String, String> envs = System.getenv(); 393 394 if (!envs.containsKey(Environment.CONTAINER_ID.name())) { 395 if (cliParser.hasOption("app_attempt_id")) { 396 String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); 397 appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); 398 } else { 399 throw new IllegalArgumentException( 400 "Application Attempt Id not set in the environment"); 401 } 402 } else { 403 ContainerId containerId = ConverterUtils.toContainerId(envs 404 .get(Environment.CONTAINER_ID.name())); 405 appAttemptID = containerId.getApplicationAttemptId(); 406 } 407 408 if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { 409 throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV 410 + " not set in the environment"); 411 } 412 if (!envs.containsKey(Environment.NM_HOST.name())) { 413 throw new RuntimeException(Environment.NM_HOST.name() 414 + " not set in the environment"); 415 } 416 if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) { 417 throw new RuntimeException(Environment.NM_HTTP_PORT 418 + " not set in the environment"); 419 } 420 if (!envs.containsKey(Environment.NM_PORT.name())) { 421 throw new RuntimeException(Environment.NM_PORT.name() 422 + " not set in the environment"); 423 } 424 425 LOG.info("Application master for app" + ", appId=" 426 + appAttemptID.getApplicationId().getId() + ", clustertimestamp=" 427 + appAttemptID.getApplicationId().getClusterTimestamp() 428 + ", attemptId=" + appAttemptID.getAttemptId()); 429 430 if (!fileExist(shellCommandPath) 431 && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) { 432 throw new IllegalArgumentException( 433 "No shell command or shell script specified to be executed by application master"); 434 } 435 436 if (fileExist(shellCommandPath)) { 437 shellCommand = readContent(shellCommandPath); 438 } 439 440 if (fileExist(shellArgsPath)) { 441 shellArgs = readContent(shellArgsPath); 442 } 443 444 if (cliParser.hasOption("shell_env")) { 445 String shellEnvs[] = cliParser.getOptionValues("shell_env"); 446 for (String env : shellEnvs) { 447 env = env.trim(); 448 int index = env.indexOf('='); 449 if (index == -1) { 450 shellEnv.put(env, ""); 451 continue; 452 } 453 String key = env.substring(0, index); 454 String val = ""; 455 if (index < (env.length() - 1)) { 456 val = env.substring(index + 1); 457 } 458 shellEnv.put(key, val); 459 } 460 } 461 462 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { 463 scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); 464 465 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { 466 shellScriptPathTimestamp = Long.valueOf(envs 467 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); 468 } 469 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { 470 shellScriptPathLen = Long.valueOf(envs 471 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); 472 } 473 if (!scriptPath.isEmpty() 474 && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { 475 LOG.error("Illegal values in env for shell script path" + ", path=" 476 + scriptPath + ", len=" + shellScriptPathLen + ", timestamp=" 477 + shellScriptPathTimestamp); 478 throw new IllegalArgumentException( 479 "Illegal values in env for shell script path"); 480 } 481 } 482 483 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { 484 domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); 485 } 486 487 containerMemory = Integer.parseInt(cliParser.getOptionValue( 488 "container_memory", "10")); 489 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( 490 "container_vcores", "1")); 491 numTotalContainers = Integer.parseInt(cliParser.getOptionValue( 492 "num_containers", "1")); 493 if (numTotalContainers == 0) { 494 throw new IllegalArgumentException( 495 "Cannot run distributed shell with no containers"); 496 } 497 requestPriority = Integer.parseInt(cliParser 498 .getOptionValue("priority", "0")); 499 500 if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, 501 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { 502 // Creating the Timeline Client 503 timelineClient = TimelineClient.createTimelineClient(); 504 timelineClient.init(conf); 505 timelineClient.start(); 506 } else { 507 timelineClient = null; 508 LOG.warn("Timeline service is not enabled"); 509 } 510 511 return true; 512 } 513 514 /** 515 * Helper function to print usage 516 * 517 * @param opts Parsed command line options 518 */ 519 private void printUsage(Options opts) { 520 new HelpFormatter().printHelp("ApplicationMaster", opts); 521 } 522 523 /** 524 * Main run function for the application master 525 * 526 * @throws YarnException 527 * @throws IOException 528 */ 529 @SuppressWarnings({ "unchecked" }) 530 public void run() throws YarnException, IOException { 531 LOG.info("Starting ApplicationMaster"); 532 533 // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class 534 // are marked as LimitedPrivate 535 Credentials credentials = 536 UserGroupInformation.getCurrentUser().getCredentials(); 537 DataOutputBuffer dob = new DataOutputBuffer(); 538 credentials.writeTokenStorageToStream(dob); 539 // Now remove the AM->RM token so that containers cannot access it. 540 Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); 541 LOG.info("Executing with tokens:"); 542 while (iter.hasNext()) { 543 Token<?> token = iter.next(); 544 LOG.info(token); 545 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { 546 iter.remove(); 547 } 548 } 549 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 550 551 // Create appSubmitterUgi and add original tokens to it 552 String appSubmitterUserName = 553 System.getenv(ApplicationConstants.Environment.USER.name()); 554 appSubmitterUgi = 555 UserGroupInformation.createRemoteUser(appSubmitterUserName); 556 appSubmitterUgi.addCredentials(credentials); 557 558 if(timelineClient != null) { 559 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 560 DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); 561 } 562 563 AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); 564 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); 565 amRMClient.init(conf); 566 amRMClient.start(); 567 568 containerListener = createNMCallbackHandler(); 569 nmClientAsync = new NMClientAsyncImpl(containerListener); 570 nmClientAsync.init(conf); 571 nmClientAsync.start(); 572 573 // Setup local RPC Server to accept status requests directly from clients 574 // TODO need to setup a protocol for client to be able to communicate to 575 // the RPC server 576 // TODO use the rpc port info to register with the RM for the client to 577 // send requests to this app master 578 579 // Register self with ResourceManager 580 // This will start heartbeating to the RM 581 appMasterHostname = NetUtils.getHostname(); 582 RegisterApplicationMasterResponse response = amRMClient 583 .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 584 appMasterTrackingUrl); 585 // Dump out information about cluster capability as seen by the 586 // resource manager 587 int maxMem = response.getMaximumResourceCapability().getMemory(); 588 LOG.info("Max mem capabililty of resources in this cluster " + maxMem); 589 590 int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); 591 LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores); 592 593 // A resource ask cannot exceed the max. 594 if (containerMemory > maxMem) { 595 LOG.info("Container memory specified above max threshold of cluster." 596 + " Using max value." + ", specified=" + containerMemory + ", max=" 597 + maxMem); 598 containerMemory = maxMem; 599 } 600 601 if (containerVirtualCores > maxVCores) { 602 LOG.info("Container virtual cores specified above max threshold of cluster." 603 + " Using max value." + ", specified=" + containerVirtualCores + ", max=" 604 + maxVCores); 605 containerVirtualCores = maxVCores; 606 } 607 608 List<Container> previousAMRunningContainers = 609 response.getContainersFromPreviousAttempts(); 610 LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() 611 + " previous attempts' running containers on AM registration."); 612 numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); 613 614 int numTotalContainersToRequest = 615 numTotalContainers - previousAMRunningContainers.size(); 616 // Setup ask for containers from RM 617 // Send request for containers to RM 618 // Until we get our fully allocated quota, we keep on polling RM for 619 // containers 620 // Keep looping until all the containers are launched and shell script 621 // executed on them ( regardless of success/failure). 622 for (int i = 0; i < numTotalContainersToRequest; ++i) { 623 ContainerRequest containerAsk = setupContainerAskForRM(); 624 amRMClient.addContainerRequest(containerAsk); 625 } 626 numRequestedContainers.set(numTotalContainers); 627 628 if(timelineClient != null) { 629 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 630 DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); 631 } 632 } 633 634 @VisibleForTesting 635 NMCallbackHandler createNMCallbackHandler() { 636 return new NMCallbackHandler(this); 637 } 638 639 @VisibleForTesting 640 protected boolean finish() { 641 // wait for completion. 642 while (!done 643 && (numCompletedContainers.get() != numTotalContainers)) { 644 try { 645 Thread.sleep(200); 646 } catch (InterruptedException ex) {} 647 } 648 649 // Join all launched threads 650 // needed for when we time out 651 // and we need to release containers 652 for (Thread launchThread : launchThreads) { 653 try { 654 launchThread.join(10000); 655 } catch (InterruptedException e) { 656 LOG.info("Exception thrown in thread join: " + e.getMessage()); 657 e.printStackTrace(); 658 } 659 } 660 661 // When the application completes, it should stop all running containers 662 LOG.info("Application completed. Stopping running containers"); 663 nmClientAsync.stop(); 664 665 // When the application completes, it should send a finish application 666 // signal to the RM 667 LOG.info("Application completed. Signalling finish to RM"); 668 669 FinalApplicationStatus appStatus; 670 String appMessage = null; 671 boolean success = true; 672 if (numFailedContainers.get() == 0 && 673 numCompletedContainers.get() == numTotalContainers) { 674 appStatus = FinalApplicationStatus.SUCCEEDED; 675 } else { 676 appStatus = FinalApplicationStatus.FAILED; 677 appMessage = "Diagnostics." + ", total=" + numTotalContainers 678 + ", completed=" + numCompletedContainers.get() + ", allocated=" 679 + numAllocatedContainers.get() + ", failed=" 680 + numFailedContainers.get(); 681 LOG.info(appMessage); 682 success = false; 683 } 684 try { 685 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); 686 } catch (YarnException ex) { 687 LOG.error("Failed to unregister application", ex); 688 } catch (IOException e) { 689 LOG.error("Failed to unregister application", e); 690 } 691 692 amRMClient.stop(); 693 694 // Stop Timeline Client 695 if(timelineClient != null) { 696 timelineClient.stop(); 697 } 698 699 return success; 700 } 701 702 private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { 703 @SuppressWarnings("unchecked") 704 @Override 705 public void onContainersCompleted(List<ContainerStatus> completedContainers) { 706 LOG.info("Got response from RM for container ask, completedCnt=" 707 + completedContainers.size()); 708 for (ContainerStatus containerStatus : completedContainers) { 709 LOG.info(appAttemptID + " got container status for containerID=" 710 + containerStatus.getContainerId() + ", state=" 711 + containerStatus.getState() + ", exitStatus=" 712 + containerStatus.getExitStatus() + ", diagnostics=" 713 + containerStatus.getDiagnostics()); 714 715 // non complete containers should not be here 716 assert (containerStatus.getState() == ContainerState.COMPLETE); 717 718 // increment counters for completed/failed containers 719 int exitStatus = containerStatus.getExitStatus(); 720 if (0 != exitStatus) { 721 // container failed 722 if (ContainerExitStatus.ABORTED != exitStatus) { 723 // shell script failed 724 // counts as completed 725 numCompletedContainers.incrementAndGet(); 726 numFailedContainers.incrementAndGet(); 727 } else { 728 // container was killed by framework, possibly preempted 729 // we should re-try as the container was lost for some reason 730 numAllocatedContainers.decrementAndGet(); 731 numRequestedContainers.decrementAndGet(); 732 // we do not need to release the container as it would be done 733 // by the RM 734 } 735 } else { 736 // nothing to do 737 // container completed successfully 738 numCompletedContainers.incrementAndGet(); 739 LOG.info("Container completed successfully." + ", containerId=" 740 + containerStatus.getContainerId()); 741 } 742 if(timelineClient != null) { 743 publishContainerEndEvent( 744 timelineClient, containerStatus, domainId, appSubmitterUgi); 745 } 746 } 747 748 // ask for more containers if any failed 749 int askCount = numTotalContainers - numRequestedContainers.get(); 750 numRequestedContainers.addAndGet(askCount); 751 752 if (askCount > 0) { 753 for (int i = 0; i < askCount; ++i) { 754 ContainerRequest containerAsk = setupContainerAskForRM(); 755 amRMClient.addContainerRequest(containerAsk); 756 } 757 } 758 759 if (numCompletedContainers.get() == numTotalContainers) { 760 done = true; 761 } 762 } 763 764 @Override 765 public void onContainersAllocated(List<Container> allocatedContainers) { 766 LOG.info("Got response from RM for container ask, allocatedCnt=" 767 + allocatedContainers.size()); 768 numAllocatedContainers.addAndGet(allocatedContainers.size()); 769 for (Container allocatedContainer : allocatedContainers) { 770 LOG.info("Launching shell command on a new container." 771 + ", containerId=" + allocatedContainer.getId() 772 + ", containerNode=" + allocatedContainer.getNodeId().getHost() 773 + ":" + allocatedContainer.getNodeId().getPort() 774 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() 775 + ", containerResourceMemory" 776 + allocatedContainer.getResource().getMemory() 777 + ", containerResourceVirtualCores" 778 + allocatedContainer.getResource().getVirtualCores()); 779 // + ", containerToken" 780 // +allocatedContainer.getContainerToken().getIdentifier().toString()); 781 782 LaunchContainerRunnable runnableLaunchContainer = 783 new LaunchContainerRunnable(allocatedContainer, containerListener); 784 Thread launchThread = new Thread(runnableLaunchContainer); 785 786 // launch and start the container on a separate thread to keep 787 // the main thread unblocked 788 // as all containers may not be allocated at one go. 789 launchThreads.add(launchThread); 790 launchThread.start(); 791 } 792 } 793 794 @Override 795 public void onShutdownRequest() { 796 done = true; 797 } 798 799 @Override 800 public void onNodesUpdated(List<NodeReport> updatedNodes) {} 801 802 @Override 803 public float getProgress() { 804 // set progress to deliver to RM on next heartbeat 805 float progress = (float) numCompletedContainers.get() 806 / numTotalContainers; 807 return progress; 808 } 809 810 @Override 811 public void onError(Throwable e) { 812 done = true; 813 amRMClient.stop(); 814 } 815 } 816 817 @VisibleForTesting 818 static class NMCallbackHandler 819 implements NMClientAsync.CallbackHandler { 820 821 private ConcurrentMap<ContainerId, Container> containers = 822 new ConcurrentHashMap<ContainerId, Container>(); 823 private final ApplicationMaster applicationMaster; 824 825 public NMCallbackHandler(ApplicationMaster applicationMaster) { 826 this.applicationMaster = applicationMaster; 827 } 828 829 public void addContainer(ContainerId containerId, Container container) { 830 containers.putIfAbsent(containerId, container); 831 } 832 833 @Override 834 public void onContainerStopped(ContainerId containerId) { 835 if (LOG.isDebugEnabled()) { 836 LOG.debug("Succeeded to stop Container " + containerId); 837 } 838 containers.remove(containerId); 839 } 840 841 @Override 842 public void onContainerStatusReceived(ContainerId containerId, 843 ContainerStatus containerStatus) { 844 if (LOG.isDebugEnabled()) { 845 LOG.debug("Container Status: id=" + containerId + ", status=" + 846 containerStatus); 847 } 848 } 849 850 @Override 851 public void onContainerStarted(ContainerId containerId, 852 Map<String, ByteBuffer> allServiceResponse) { 853 if (LOG.isDebugEnabled()) { 854 LOG.debug("Succeeded to start Container " + containerId); 855 } 856 Container container = containers.get(containerId); 857 if (container != null) { 858 applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); 859 } 860 if(applicationMaster.timelineClient != null) { 861 ApplicationMaster.publishContainerStartEvent( 862 applicationMaster.timelineClient, container, 863 applicationMaster.domainId, applicationMaster.appSubmitterUgi); 864 } 865 } 866 867 @Override 868 public void onStartContainerError(ContainerId containerId, Throwable t) { 869 LOG.error("Failed to start Container " + containerId); 870 containers.remove(containerId); 871 applicationMaster.numCompletedContainers.incrementAndGet(); 872 applicationMaster.numFailedContainers.incrementAndGet(); 873 } 874 875 @Override 876 public void onGetContainerStatusError( 877 ContainerId containerId, Throwable t) { 878 LOG.error("Failed to query the status of Container " + containerId); 879 } 880 881 @Override 882 public void onStopContainerError(ContainerId containerId, Throwable t) { 883 LOG.error("Failed to stop Container " + containerId); 884 containers.remove(containerId); 885 } 886 } 887 888 /** 889 * Thread to connect to the {@link ContainerManagementProtocol} and launch the container 890 * that will execute the shell command. 891 */ 892 private class LaunchContainerRunnable implements Runnable { 893 894 // Allocated container 895 Container container; 896 897 NMCallbackHandler containerListener; 898 899 /** 900 * @param lcontainer Allocated container 901 * @param containerListener Callback handler of the container 902 */ 903 public LaunchContainerRunnable( 904 Container lcontainer, NMCallbackHandler containerListener) { 905 this.container = lcontainer; 906 this.containerListener = containerListener; 907 } 908 909 @Override 910 /** 911 * Connects to CM, sets up container launch context 912 * for shell command and eventually dispatches the container 913 * start request to the CM. 914 */ 915 public void run() { 916 LOG.info("Setting up container launch container for containerid=" 917 + container.getId()); 918 919 // Set the local resources 920 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 921 922 // The container for the eventual shell commands needs its own local 923 // resources too. 924 // In this scenario, if a shell script is specified, we need to have it 925 // copied and made available to the container. 926 if (!scriptPath.isEmpty()) { 927 Path renamedScriptPath = null; 928 if (Shell.WINDOWS) { 929 renamedScriptPath = new Path(scriptPath + ".bat"); 930 } else { 931 renamedScriptPath = new Path(scriptPath + ".sh"); 932 } 933 934 try { 935 // rename the script file based on the underlying OS syntax. 936 renameScriptFile(renamedScriptPath); 937 } catch (Exception e) { 938 LOG.error( 939 "Not able to add suffix (.bat/.sh) to the shell script filename", 940 e); 941 // We know we cannot continue launching the container 942 // so we should release it. 943 numCompletedContainers.incrementAndGet(); 944 numFailedContainers.incrementAndGet(); 945 return; 946 } 947 948 URL yarnUrl = null; 949 try { 950 yarnUrl = ConverterUtils.getYarnUrlFromURI( 951 new URI(renamedScriptPath.toString())); 952 } catch (URISyntaxException e) { 953 LOG.error("Error when trying to use shell script path specified" 954 + " in env, path=" + renamedScriptPath, e); 955 // A failure scenario on bad input such as invalid shell script path 956 // We know we cannot continue launching the container 957 // so we should release it. 958 // TODO 959 numCompletedContainers.incrementAndGet(); 960 numFailedContainers.incrementAndGet(); 961 return; 962 } 963 LocalResource shellRsrc = LocalResource.newInstance(yarnUrl, 964 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 965 shellScriptPathLen, shellScriptPathTimestamp); 966 localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath : 967 ExecShellStringPath, shellRsrc); 968 shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; 969 } 970 971 // Set the necessary command to execute on the allocated container 972 Vector<CharSequence> vargs = new Vector<CharSequence>(5); 973 974 // Set executable command 975 vargs.add(shellCommand); 976 // Set shell script path 977 if (!scriptPath.isEmpty()) { 978 vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath 979 : ExecShellStringPath); 980 } 981 982 // Set args for the shell command if any 983 vargs.add(shellArgs); 984 // Add log redirect params 985 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); 986 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); 987 988 // Get final commmand 989 StringBuilder command = new StringBuilder(); 990 for (CharSequence str : vargs) { 991 command.append(str).append(" "); 992 } 993 994 List<String> commands = new ArrayList<String>(); 995 commands.add(command.toString()); 996 997 // Set up ContainerLaunchContext, setting local resource, environment, 998 // command and token for constructor. 999 1000 // Note for tokens: Set up tokens for the container too. Today, for normal 1001 // shell commands, the container in distribute-shell doesn't need any 1002 // tokens. We are populating them mainly for NodeManagers to be able to 1003 // download anyfiles in the distributed file-system. The tokens are 1004 // otherwise also useful in cases, for e.g., when one is running a 1005 // "hadoop dfs" command inside the distributed shell. 1006 ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( 1007 localResources, shellEnv, commands, null, allTokens.duplicate(), null); 1008 containerListener.addContainer(container.getId(), container); 1009 nmClientAsync.startContainerAsync(container, ctx); 1010 } 1011 } 1012 1013 private void renameScriptFile(final Path renamedScriptPath) 1014 throws IOException, InterruptedException { 1015 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 1016 @Override 1017 public Void run() throws IOException { 1018 FileSystem fs = renamedScriptPath.getFileSystem(conf); 1019 fs.rename(new Path(scriptPath), renamedScriptPath); 1020 return null; 1021 } 1022 }); 1023 LOG.info("User " + appSubmitterUgi.getUserName() 1024 + " added suffix(.sh/.bat) to script file as " + renamedScriptPath); 1025 } 1026 1027 /** 1028 * Setup the request that will be sent to the RM for the container ask. 1029 * 1030 * @return the setup ResourceRequest to be sent to RM 1031 */ 1032 private ContainerRequest setupContainerAskForRM() { 1033 // setup requirements for hosts 1034 // using * as any host will do for the distributed shell app 1035 // set the priority for the request 1036 // TODO - what is the range for priority? how to decide? 1037 Priority pri = Priority.newInstance(requestPriority); 1038 1039 // Set up resource type requirements 1040 // For now, memory and CPU are supported so we set memory and cpu requirements 1041 Resource capability = Resource.newInstance(containerMemory, 1042 containerVirtualCores); 1043 1044 ContainerRequest request = new ContainerRequest(capability, null, null, 1045 pri); 1046 LOG.info("Requested container ask: " + request.toString()); 1047 return request; 1048 } 1049 1050 private boolean fileExist(String filePath) { 1051 return new File(filePath).exists(); 1052 } 1053 1054 private String readContent(String filePath) throws IOException { 1055 DataInputStream ds = null; 1056 try { 1057 ds = new DataInputStream(new FileInputStream(filePath)); 1058 return ds.readUTF(); 1059 } finally { 1060 org.apache.commons.io.IOUtils.closeQuietly(ds); 1061 } 1062 } 1063 1064 private static void publishContainerStartEvent( 1065 final TimelineClient timelineClient, Container container, String domainId, 1066 UserGroupInformation ugi) { 1067 final TimelineEntity entity = new TimelineEntity(); 1068 entity.setEntityId(container.getId().toString()); 1069 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1070 entity.setDomainId(domainId); 1071 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1072 TimelineEvent event = new TimelineEvent(); 1073 event.setTimestamp(System.currentTimeMillis()); 1074 event.setEventType(DSEvent.DS_CONTAINER_START.toString()); 1075 event.addEventInfo("Node", container.getNodeId().toString()); 1076 event.addEventInfo("Resources", container.getResource().toString()); 1077 entity.addEvent(event); 1078 1079 try { 1080 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { 1081 @Override 1082 public TimelinePutResponse run() throws Exception { 1083 return timelineClient.putEntities(entity); 1084 } 1085 }); 1086 } catch (Exception e) { 1087 LOG.error("Container start event could not be published for " 1088 + container.getId().toString(), 1089 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1090 } 1091 } 1092 1093 private static void publishContainerEndEvent( 1094 final TimelineClient timelineClient, ContainerStatus container, 1095 String domainId, UserGroupInformation ugi) { 1096 final TimelineEntity entity = new TimelineEntity(); 1097 entity.setEntityId(container.getContainerId().toString()); 1098 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1099 entity.setDomainId(domainId); 1100 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1101 TimelineEvent event = new TimelineEvent(); 1102 event.setTimestamp(System.currentTimeMillis()); 1103 event.setEventType(DSEvent.DS_CONTAINER_END.toString()); 1104 event.addEventInfo("State", container.getState().name()); 1105 event.addEventInfo("Exit Status", container.getExitStatus()); 1106 entity.addEvent(event); 1107 1108 try { 1109 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { 1110 @Override 1111 public TimelinePutResponse run() throws Exception { 1112 return timelineClient.putEntities(entity); 1113 } 1114 }); 1115 } catch (Exception e) { 1116 LOG.error("Container end event could not be published for " 1117 + container.getContainerId().toString(), 1118 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1119 } 1120 } 1121 1122 private static void publishApplicationAttemptEvent( 1123 final TimelineClient timelineClient, String appAttemptId, 1124 DSEvent appEvent, String domainId, UserGroupInformation ugi) { 1125 final TimelineEntity entity = new TimelineEntity(); 1126 entity.setEntityId(appAttemptId); 1127 entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); 1128 entity.setDomainId(domainId); 1129 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1130 TimelineEvent event = new TimelineEvent(); 1131 event.setEventType(appEvent.toString()); 1132 event.setTimestamp(System.currentTimeMillis()); 1133 entity.addEvent(event); 1134 1135 try { 1136 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { 1137 @Override 1138 public TimelinePutResponse run() throws Exception { 1139 return timelineClient.putEntities(entity); 1140 } 1141 }); 1142 } catch (Exception e) { 1143 LOG.error("App Attempt " 1144 + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") 1145 + " event could not be published for " 1146 + appAttemptId.toString(), 1147 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1148 } 1149 } 1150}