001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.applications.distributedshell; 020 021import java.io.BufferedReader; 022import java.io.DataInputStream; 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.io.StringReader; 027import java.lang.reflect.UndeclaredThrowableException; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.nio.ByteBuffer; 031import java.security.PrivilegedExceptionAction; 032import java.util.ArrayList; 033import java.util.Collections; 034import java.util.HashMap; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.Vector; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.atomic.AtomicInteger; 043 044import org.apache.commons.cli.CommandLine; 045import org.apache.commons.cli.GnuParser; 046import org.apache.commons.cli.HelpFormatter; 047import org.apache.commons.cli.Options; 048import org.apache.commons.cli.ParseException; 049import org.apache.commons.logging.Log; 050import org.apache.commons.logging.LogFactory; 051import org.apache.hadoop.classification.InterfaceAudience; 052import org.apache.hadoop.classification.InterfaceAudience.Private; 053import org.apache.hadoop.classification.InterfaceStability; 054import org.apache.hadoop.conf.Configuration; 055import org.apache.hadoop.fs.FileSystem; 056import org.apache.hadoop.fs.Path; 057import org.apache.hadoop.io.DataOutputBuffer; 058import org.apache.hadoop.io.IOUtils; 059import org.apache.hadoop.net.NetUtils; 060import org.apache.hadoop.security.Credentials; 061import org.apache.hadoop.security.UserGroupInformation; 062import org.apache.hadoop.security.token.Token; 063import org.apache.hadoop.util.ExitUtil; 064import org.apache.hadoop.util.Shell; 065import org.apache.hadoop.yarn.api.ApplicationConstants; 066import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 067import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; 068import org.apache.hadoop.yarn.api.ContainerManagementProtocol; 069import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; 070import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 071import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; 072import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 073import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 074import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 075import org.apache.hadoop.yarn.api.records.Container; 076import org.apache.hadoop.yarn.api.records.ContainerExitStatus; 077import org.apache.hadoop.yarn.api.records.ContainerId; 078import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 079import org.apache.hadoop.yarn.api.records.ContainerState; 080import org.apache.hadoop.yarn.api.records.ContainerStatus; 081import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 082import org.apache.hadoop.yarn.api.records.LocalResource; 083import org.apache.hadoop.yarn.api.records.LocalResourceType; 084import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 085import org.apache.hadoop.yarn.api.records.NodeReport; 086import org.apache.hadoop.yarn.api.records.Priority; 087import org.apache.hadoop.yarn.api.records.Resource; 088import org.apache.hadoop.yarn.api.records.ResourceRequest; 089import org.apache.hadoop.yarn.api.records.URL; 090import org.apache.hadoop.yarn.api.records.UpdatedContainer; 091import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; 092import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; 093import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; 094import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; 095import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 096import org.apache.hadoop.yarn.client.api.TimelineClient; 097import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; 098import org.apache.hadoop.yarn.client.api.async.NMClientAsync; 099import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; 100import org.apache.hadoop.yarn.conf.YarnConfiguration; 101import org.apache.hadoop.yarn.exceptions.YarnException; 102import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 103import org.apache.hadoop.yarn.util.ConverterUtils; 104import org.apache.hadoop.yarn.util.timeline.TimelineUtils; 105import org.apache.log4j.LogManager; 106 107import com.google.common.annotations.VisibleForTesting; 108import com.sun.jersey.api.client.ClientHandlerException; 109 110/** 111 * An ApplicationMaster for executing shell commands on a set of launched 112 * containers using the YARN framework. 113 * 114 * <p> 115 * This class is meant to act as an example on how to write yarn-based 116 * application masters. 117 * </p> 118 * 119 * <p> 120 * The ApplicationMaster is started on a container by the 121 * <code>ResourceManager</code>'s launcher. The first thing that the 122 * <code>ApplicationMaster</code> needs to do is to connect and register itself 123 * with the <code>ResourceManager</code>. The registration sets up information 124 * within the <code>ResourceManager</code> regarding what host:port the 125 * ApplicationMaster is listening on to provide any form of functionality to a 126 * client as well as a tracking url that a client can use to keep track of 127 * status/job history if needed. However, in the distributedshell, trackingurl 128 * and appMasterHost:appMasterRpcPort are not supported. 129 * </p> 130 * 131 * <p> 132 * The <code>ApplicationMaster</code> needs to send a heartbeat to the 133 * <code>ResourceManager</code> at regular intervals to inform the 134 * <code>ResourceManager</code> that it is up and alive. The 135 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the 136 * <code>ApplicationMaster</code> acts as a heartbeat. 137 * 138 * <p> 139 * For the actual handling of the job, the <code>ApplicationMaster</code> has to 140 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the 141 * required no. of containers using {@link ResourceRequest} with the necessary 142 * resource specifications such as node location, computational 143 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code> 144 * responds with an {@link AllocateResponse} that informs the 145 * <code>ApplicationMaster</code> of the set of newly allocated containers, 146 * completed containers as well as current state of available resources. 147 * </p> 148 * 149 * <p> 150 * For each allocated container, the <code>ApplicationMaster</code> can then set 151 * up the necessary launch context via {@link ContainerLaunchContext} to specify 152 * the allocated container id, local resources required by the executable, the 153 * environment to be setup for the executable, commands to execute, etc. and 154 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to 155 * launch and execute the defined commands on the given allocated container. 156 * </p> 157 * 158 * <p> 159 * The <code>ApplicationMaster</code> can monitor the launched container by 160 * either querying the <code>ResourceManager</code> using 161 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via 162 * the {@link ContainerManagementProtocol} by querying for the status of the allocated 163 * container's {@link ContainerId}. 164 * 165 * <p> 166 * After the job has been completed, the <code>ApplicationMaster</code> has to 167 * send a {@link FinishApplicationMasterRequest} to the 168 * <code>ResourceManager</code> to inform it that the 169 * <code>ApplicationMaster</code> has been completed. 170 */ 171@InterfaceAudience.Public 172@InterfaceStability.Unstable 173public class ApplicationMaster { 174 175 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); 176 177 @VisibleForTesting 178 @Private 179 public static enum DSEvent { 180 DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END 181 } 182 183 @VisibleForTesting 184 @Private 185 public static enum DSEntity { 186 DS_APP_ATTEMPT, DS_CONTAINER 187 } 188 189 private static final String YARN_SHELL_ID = "YARN_SHELL_ID"; 190 191 // Configuration 192 private Configuration conf; 193 194 // Handle to communicate with the Resource Manager 195 @SuppressWarnings("rawtypes") 196 private AMRMClientAsync amRMClient; 197 198 // In both secure and non-secure modes, this points to the job-submitter. 199 @VisibleForTesting 200 UserGroupInformation appSubmitterUgi; 201 202 // Handle to communicate with the Node Manager 203 private NMClientAsync nmClientAsync; 204 // Listen to process the response from the Node Manager 205 private NMCallbackHandler containerListener; 206 207 // Application Attempt Id ( combination of attemptId and fail count ) 208 @VisibleForTesting 209 protected ApplicationAttemptId appAttemptID; 210 211 // TODO 212 // For status update for clients - yet to be implemented 213 // Hostname of the container 214 private String appMasterHostname = ""; 215 // Port on which the app master listens for status updates from clients 216 private int appMasterRpcPort = -1; 217 // Tracking url to which app master publishes info for clients to monitor 218 private String appMasterTrackingUrl = ""; 219 220 // App Master configuration 221 // No. of containers to run shell command on 222 @VisibleForTesting 223 protected int numTotalContainers = 1; 224 // Memory to request for the container on which the shell command will run 225 private long containerMemory = 10; 226 // VirtualCores to request for the container on which the shell command will run 227 private int containerVirtualCores = 1; 228 // Priority of the request 229 private int requestPriority; 230 231 // Counter for completed containers ( complete denotes successful or failed ) 232 private AtomicInteger numCompletedContainers = new AtomicInteger(); 233 // Allocated container count so that we know how many containers has the RM 234 // allocated to us 235 @VisibleForTesting 236 protected AtomicInteger numAllocatedContainers = new AtomicInteger(); 237 // Count of failed containers 238 private AtomicInteger numFailedContainers = new AtomicInteger(); 239 // Count of containers already requested from the RM 240 // Needed as once requested, we should not request for containers again. 241 // Only request for more if the original requirement changes. 242 @VisibleForTesting 243 protected AtomicInteger numRequestedContainers = new AtomicInteger(); 244 245 // Shell command to be executed 246 private String shellCommand = ""; 247 // Args to be passed to the shell command 248 private String shellArgs = ""; 249 // Env variables to be setup for the shell command 250 private Map<String, String> shellEnv = new HashMap<String, String>(); 251 252 // Location of shell script ( obtained from info set in env ) 253 // Shell script path in fs 254 private String scriptPath = ""; 255 // Timestamp needed for creating a local resource 256 private long shellScriptPathTimestamp = 0; 257 // File length needed for local resource 258 private long shellScriptPathLen = 0; 259 260 // Timeline domain ID 261 private String domainId = null; 262 263 // Hardcoded path to shell script in launch container's local env 264 private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh"; 265 private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH 266 + ".bat"; 267 268 // Hardcoded path to custom log_properties 269 private static final String log4jPath = "log4j.properties"; 270 271 private static final String shellCommandPath = "shellCommands"; 272 private static final String shellArgsPath = "shellArgs"; 273 274 private volatile boolean done; 275 276 private ByteBuffer allTokens; 277 278 // Launch threads 279 private List<Thread> launchThreads = new ArrayList<Thread>(); 280 281 // Timeline Client 282 @VisibleForTesting 283 TimelineClient timelineClient; 284 static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS"; 285 static final String APPID_TIMELINE_FILTER_NAME = "appId"; 286 static final String USER_TIMELINE_FILTER_NAME = "user"; 287 288 private final String linux_bash_command = "bash"; 289 private final String windows_command = "cmd /c"; 290 291 private int yarnShellIdCounter = 1; 292 293 @VisibleForTesting 294 protected final Set<ContainerId> launchedContainers = 295 Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>()); 296 297 /** 298 * @param args Command line args 299 */ 300 public static void main(String[] args) { 301 boolean result = false; 302 try { 303 ApplicationMaster appMaster = new ApplicationMaster(); 304 LOG.info("Initializing ApplicationMaster"); 305 boolean doRun = appMaster.init(args); 306 if (!doRun) { 307 System.exit(0); 308 } 309 appMaster.run(); 310 result = appMaster.finish(); 311 } catch (Throwable t) { 312 LOG.fatal("Error running ApplicationMaster", t); 313 LogManager.shutdown(); 314 ExitUtil.terminate(1, t); 315 } 316 if (result) { 317 LOG.info("Application Master completed successfully. exiting"); 318 System.exit(0); 319 } else { 320 LOG.info("Application Master failed. exiting"); 321 System.exit(2); 322 } 323 } 324 325 /** 326 * Dump out contents of $CWD and the environment to stdout for debugging 327 */ 328 private void dumpOutDebugInfo() { 329 330 LOG.info("Dump debug output"); 331 Map<String, String> envs = System.getenv(); 332 for (Map.Entry<String, String> env : envs.entrySet()) { 333 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); 334 System.out.println("System env: key=" + env.getKey() + ", val=" 335 + env.getValue()); 336 } 337 338 BufferedReader buf = null; 339 try { 340 String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : 341 Shell.execCommand("ls", "-al"); 342 buf = new BufferedReader(new StringReader(lines)); 343 String line = ""; 344 while ((line = buf.readLine()) != null) { 345 LOG.info("System CWD content: " + line); 346 System.out.println("System CWD content: " + line); 347 } 348 } catch (IOException e) { 349 e.printStackTrace(); 350 } finally { 351 IOUtils.cleanup(LOG, buf); 352 } 353 } 354 355 public ApplicationMaster() { 356 // Set up the configuration 357 conf = new YarnConfiguration(); 358 } 359 360 /** 361 * Parse command line options 362 * 363 * @param args Command line args 364 * @return Whether init successful and run should be invoked 365 * @throws ParseException 366 * @throws IOException 367 */ 368 public boolean init(String[] args) throws ParseException, IOException { 369 Options opts = new Options(); 370 opts.addOption("app_attempt_id", true, 371 "App Attempt ID. Not to be used unless for testing purposes"); 372 opts.addOption("shell_env", true, 373 "Environment for shell script. Specified as env_key=env_val pairs"); 374 opts.addOption("container_memory", true, 375 "Amount of memory in MB to be requested to run the shell command"); 376 opts.addOption("container_vcores", true, 377 "Amount of virtual cores to be requested to run the shell command"); 378 opts.addOption("num_containers", true, 379 "No. of containers on which the shell command needs to be executed"); 380 opts.addOption("priority", true, "Application Priority. Default 0"); 381 opts.addOption("debug", false, "Dump out debug information"); 382 383 opts.addOption("help", false, "Print usage"); 384 CommandLine cliParser = new GnuParser().parse(opts, args); 385 386 if (args.length == 0) { 387 printUsage(opts); 388 throw new IllegalArgumentException( 389 "No args specified for application master to initialize"); 390 } 391 392 //Check whether customer log4j.properties file exists 393 if (fileExist(log4jPath)) { 394 try { 395 Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, 396 log4jPath); 397 } catch (Exception e) { 398 LOG.warn("Can not set up custom log4j properties. " + e); 399 } 400 } 401 402 if (cliParser.hasOption("help")) { 403 printUsage(opts); 404 return false; 405 } 406 407 if (cliParser.hasOption("debug")) { 408 dumpOutDebugInfo(); 409 } 410 411 Map<String, String> envs = System.getenv(); 412 413 if (!envs.containsKey(Environment.CONTAINER_ID.name())) { 414 if (cliParser.hasOption("app_attempt_id")) { 415 String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); 416 appAttemptID = ApplicationAttemptId.fromString(appIdStr); 417 } else { 418 throw new IllegalArgumentException( 419 "Application Attempt Id not set in the environment"); 420 } 421 } else { 422 ContainerId containerId = ContainerId.fromString(envs 423 .get(Environment.CONTAINER_ID.name())); 424 appAttemptID = containerId.getApplicationAttemptId(); 425 } 426 427 if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { 428 throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV 429 + " not set in the environment"); 430 } 431 if (!envs.containsKey(Environment.NM_HOST.name())) { 432 throw new RuntimeException(Environment.NM_HOST.name() 433 + " not set in the environment"); 434 } 435 if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) { 436 throw new RuntimeException(Environment.NM_HTTP_PORT 437 + " not set in the environment"); 438 } 439 if (!envs.containsKey(Environment.NM_PORT.name())) { 440 throw new RuntimeException(Environment.NM_PORT.name() 441 + " not set in the environment"); 442 } 443 444 LOG.info("Application master for app" + ", appId=" 445 + appAttemptID.getApplicationId().getId() + ", clustertimestamp=" 446 + appAttemptID.getApplicationId().getClusterTimestamp() 447 + ", attemptId=" + appAttemptID.getAttemptId()); 448 449 if (!fileExist(shellCommandPath) 450 && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) { 451 throw new IllegalArgumentException( 452 "No shell command or shell script specified to be executed by application master"); 453 } 454 455 if (fileExist(shellCommandPath)) { 456 shellCommand = readContent(shellCommandPath); 457 } 458 459 if (fileExist(shellArgsPath)) { 460 shellArgs = readContent(shellArgsPath); 461 } 462 463 if (cliParser.hasOption("shell_env")) { 464 String shellEnvs[] = cliParser.getOptionValues("shell_env"); 465 for (String env : shellEnvs) { 466 env = env.trim(); 467 int index = env.indexOf('='); 468 if (index == -1) { 469 shellEnv.put(env, ""); 470 continue; 471 } 472 String key = env.substring(0, index); 473 String val = ""; 474 if (index < (env.length() - 1)) { 475 val = env.substring(index + 1); 476 } 477 shellEnv.put(key, val); 478 } 479 } 480 481 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { 482 scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); 483 484 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { 485 shellScriptPathTimestamp = Long.parseLong(envs 486 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); 487 } 488 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { 489 shellScriptPathLen = Long.parseLong(envs 490 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); 491 } 492 if (!scriptPath.isEmpty() 493 && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { 494 LOG.error("Illegal values in env for shell script path" + ", path=" 495 + scriptPath + ", len=" + shellScriptPathLen + ", timestamp=" 496 + shellScriptPathTimestamp); 497 throw new IllegalArgumentException( 498 "Illegal values in env for shell script path"); 499 } 500 } 501 502 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { 503 domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); 504 } 505 506 containerMemory = Integer.parseInt(cliParser.getOptionValue( 507 "container_memory", "10")); 508 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( 509 "container_vcores", "1")); 510 numTotalContainers = Integer.parseInt(cliParser.getOptionValue( 511 "num_containers", "1")); 512 if (numTotalContainers == 0) { 513 throw new IllegalArgumentException( 514 "Cannot run distributed shell with no containers"); 515 } 516 requestPriority = Integer.parseInt(cliParser 517 .getOptionValue("priority", "0")); 518 return true; 519 } 520 521 /** 522 * Helper function to print usage 523 * 524 * @param opts Parsed command line options 525 */ 526 private void printUsage(Options opts) { 527 new HelpFormatter().printHelp("ApplicationMaster", opts); 528 } 529 530 /** 531 * Main run function for the application master 532 * 533 * @throws YarnException 534 * @throws IOException 535 */ 536 @SuppressWarnings({ "unchecked" }) 537 public void run() throws YarnException, IOException, InterruptedException { 538 LOG.info("Starting ApplicationMaster"); 539 540 // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class 541 // are marked as LimitedPrivate 542 Credentials credentials = 543 UserGroupInformation.getCurrentUser().getCredentials(); 544 DataOutputBuffer dob = new DataOutputBuffer(); 545 credentials.writeTokenStorageToStream(dob); 546 // Now remove the AM->RM token so that containers cannot access it. 547 Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); 548 LOG.info("Executing with tokens:"); 549 while (iter.hasNext()) { 550 Token<?> token = iter.next(); 551 LOG.info(token); 552 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { 553 iter.remove(); 554 } 555 } 556 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 557 558 // Create appSubmitterUgi and add original tokens to it 559 String appSubmitterUserName = 560 System.getenv(ApplicationConstants.Environment.USER.name()); 561 appSubmitterUgi = 562 UserGroupInformation.createRemoteUser(appSubmitterUserName); 563 appSubmitterUgi.addCredentials(credentials); 564 565 566 AMRMClientAsync.AbstractCallbackHandler allocListener = 567 new RMCallbackHandler(); 568 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); 569 amRMClient.init(conf); 570 amRMClient.start(); 571 572 containerListener = createNMCallbackHandler(); 573 nmClientAsync = new NMClientAsyncImpl(containerListener); 574 nmClientAsync.init(conf); 575 nmClientAsync.start(); 576 577 startTimelineClient(conf); 578 if(timelineClient != null) { 579 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 580 DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); 581 } 582 583 // Setup local RPC Server to accept status requests directly from clients 584 // TODO need to setup a protocol for client to be able to communicate to 585 // the RPC server 586 // TODO use the rpc port info to register with the RM for the client to 587 // send requests to this app master 588 589 // Register self with ResourceManager 590 // This will start heartbeating to the RM 591 appMasterHostname = NetUtils.getHostname(); 592 RegisterApplicationMasterResponse response = amRMClient 593 .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 594 appMasterTrackingUrl); 595 // Dump out information about cluster capability as seen by the 596 // resource manager 597 long maxMem = response.getMaximumResourceCapability().getMemorySize(); 598 LOG.info("Max mem capability of resources in this cluster " + maxMem); 599 600 int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); 601 LOG.info("Max vcores capability of resources in this cluster " + maxVCores); 602 603 // A resource ask cannot exceed the max. 604 if (containerMemory > maxMem) { 605 LOG.info("Container memory specified above max threshold of cluster." 606 + " Using max value." + ", specified=" + containerMemory + ", max=" 607 + maxMem); 608 containerMemory = maxMem; 609 } 610 611 if (containerVirtualCores > maxVCores) { 612 LOG.info("Container virtual cores specified above max threshold of cluster." 613 + " Using max value." + ", specified=" + containerVirtualCores + ", max=" 614 + maxVCores); 615 containerVirtualCores = maxVCores; 616 } 617 618 List<Container> previousAMRunningContainers = 619 response.getContainersFromPreviousAttempts(); 620 LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() 621 + " previous attempts' running containers on AM registration."); 622 for(Container container: previousAMRunningContainers) { 623 launchedContainers.add(container.getId()); 624 } 625 numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); 626 627 628 int numTotalContainersToRequest = 629 numTotalContainers - previousAMRunningContainers.size(); 630 // Setup ask for containers from RM 631 // Send request for containers to RM 632 // Until we get our fully allocated quota, we keep on polling RM for 633 // containers 634 // Keep looping until all the containers are launched and shell script 635 // executed on them ( regardless of success/failure). 636 for (int i = 0; i < numTotalContainersToRequest; ++i) { 637 ContainerRequest containerAsk = setupContainerAskForRM(); 638 amRMClient.addContainerRequest(containerAsk); 639 } 640 numRequestedContainers.set(numTotalContainers); 641 } 642 643 @VisibleForTesting 644 void startTimelineClient(final Configuration conf) 645 throws YarnException, IOException, InterruptedException { 646 try { 647 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 648 @Override 649 public Void run() throws Exception { 650 if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, 651 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { 652 // Creating the Timeline Client 653 timelineClient = TimelineClient.createTimelineClient(); 654 timelineClient.init(conf); 655 timelineClient.start(); 656 } else { 657 timelineClient = null; 658 LOG.warn("Timeline service is not enabled"); 659 } 660 return null; 661 } 662 }); 663 } catch (UndeclaredThrowableException e) { 664 throw new YarnException(e.getCause()); 665 } 666 } 667 668 @VisibleForTesting 669 NMCallbackHandler createNMCallbackHandler() { 670 return new NMCallbackHandler(this); 671 } 672 673 @VisibleForTesting 674 protected boolean finish() { 675 // wait for completion. 676 while (!done 677 && (numCompletedContainers.get() != numTotalContainers)) { 678 try { 679 Thread.sleep(200); 680 } catch (InterruptedException ex) {} 681 } 682 683 if(timelineClient != null) { 684 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 685 DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); 686 } 687 688 // Join all launched threads 689 // needed for when we time out 690 // and we need to release containers 691 for (Thread launchThread : launchThreads) { 692 try { 693 launchThread.join(10000); 694 } catch (InterruptedException e) { 695 LOG.info("Exception thrown in thread join: " + e.getMessage()); 696 e.printStackTrace(); 697 } 698 } 699 700 // When the application completes, it should stop all running containers 701 LOG.info("Application completed. Stopping running containers"); 702 nmClientAsync.stop(); 703 704 // When the application completes, it should send a finish application 705 // signal to the RM 706 LOG.info("Application completed. Signalling finish to RM"); 707 708 FinalApplicationStatus appStatus; 709 String appMessage = null; 710 boolean success = true; 711 if (numFailedContainers.get() == 0 && 712 numCompletedContainers.get() == numTotalContainers) { 713 appStatus = FinalApplicationStatus.SUCCEEDED; 714 } else { 715 appStatus = FinalApplicationStatus.FAILED; 716 appMessage = "Diagnostics." + ", total=" + numTotalContainers 717 + ", completed=" + numCompletedContainers.get() + ", allocated=" 718 + numAllocatedContainers.get() + ", failed=" 719 + numFailedContainers.get(); 720 LOG.info(appMessage); 721 success = false; 722 } 723 try { 724 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); 725 } catch (YarnException ex) { 726 LOG.error("Failed to unregister application", ex); 727 } catch (IOException e) { 728 LOG.error("Failed to unregister application", e); 729 } 730 731 amRMClient.stop(); 732 733 // Stop Timeline Client 734 if(timelineClient != null) { 735 timelineClient.stop(); 736 } 737 738 return success; 739 } 740 741 @VisibleForTesting 742 class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler { 743 @SuppressWarnings("unchecked") 744 @Override 745 public void onContainersCompleted(List<ContainerStatus> completedContainers) { 746 LOG.info("Got response from RM for container ask, completedCnt=" 747 + completedContainers.size()); 748 for (ContainerStatus containerStatus : completedContainers) { 749 LOG.info(appAttemptID + " got container status for containerID=" 750 + containerStatus.getContainerId() + ", state=" 751 + containerStatus.getState() + ", exitStatus=" 752 + containerStatus.getExitStatus() + ", diagnostics=" 753 + containerStatus.getDiagnostics()); 754 755 // non complete containers should not be here 756 assert (containerStatus.getState() == ContainerState.COMPLETE); 757 // ignore containers we know nothing about - probably from a previous 758 // attempt 759 if (!launchedContainers.contains(containerStatus.getContainerId())) { 760 LOG.info("Ignoring completed status of " 761 + containerStatus.getContainerId() 762 + "; unknown container(probably launched by previous attempt)"); 763 continue; 764 } 765 766 // increment counters for completed/failed containers 767 int exitStatus = containerStatus.getExitStatus(); 768 if (0 != exitStatus) { 769 // container failed 770 if (ContainerExitStatus.ABORTED != exitStatus) { 771 // shell script failed 772 // counts as completed 773 numCompletedContainers.incrementAndGet(); 774 numFailedContainers.incrementAndGet(); 775 } else { 776 // container was killed by framework, possibly preempted 777 // we should re-try as the container was lost for some reason 778 numAllocatedContainers.decrementAndGet(); 779 numRequestedContainers.decrementAndGet(); 780 // we do not need to release the container as it would be done 781 // by the RM 782 } 783 } else { 784 // nothing to do 785 // container completed successfully 786 numCompletedContainers.incrementAndGet(); 787 LOG.info("Container completed successfully." + ", containerId=" 788 + containerStatus.getContainerId()); 789 } 790 if(timelineClient != null) { 791 publishContainerEndEvent( 792 timelineClient, containerStatus, domainId, appSubmitterUgi); 793 } 794 } 795 796 // ask for more containers if any failed 797 int askCount = numTotalContainers - numRequestedContainers.get(); 798 numRequestedContainers.addAndGet(askCount); 799 800 if (askCount > 0) { 801 for (int i = 0; i < askCount; ++i) { 802 ContainerRequest containerAsk = setupContainerAskForRM(); 803 amRMClient.addContainerRequest(containerAsk); 804 } 805 } 806 807 if (numCompletedContainers.get() == numTotalContainers) { 808 done = true; 809 } 810 } 811 812 @Override 813 public void onContainersAllocated(List<Container> allocatedContainers) { 814 LOG.info("Got response from RM for container ask, allocatedCnt=" 815 + allocatedContainers.size()); 816 numAllocatedContainers.addAndGet(allocatedContainers.size()); 817 for (Container allocatedContainer : allocatedContainers) { 818 String yarnShellId = Integer.toString(yarnShellIdCounter); 819 yarnShellIdCounter++; 820 LOG.info("Launching shell command on a new container." 821 + ", containerId=" + allocatedContainer.getId() 822 + ", yarnShellId=" + yarnShellId 823 + ", containerNode=" + allocatedContainer.getNodeId().getHost() 824 + ":" + allocatedContainer.getNodeId().getPort() 825 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() 826 + ", containerResourceMemory" 827 + allocatedContainer.getResource().getMemorySize() 828 + ", containerResourceVirtualCores" 829 + allocatedContainer.getResource().getVirtualCores()); 830 // + ", containerToken" 831 // +allocatedContainer.getContainerToken().getIdentifier().toString()); 832 833 Thread launchThread = createLaunchContainerThread(allocatedContainer, 834 yarnShellId); 835 836 // launch and start the container on a separate thread to keep 837 // the main thread unblocked 838 // as all containers may not be allocated at one go. 839 launchThreads.add(launchThread); 840 launchedContainers.add(allocatedContainer.getId()); 841 launchThread.start(); 842 } 843 } 844 845 @Override 846 public void onContainersUpdated( 847 List<UpdatedContainer> containers) {} 848 849 @Override 850 public void onShutdownRequest() { 851 done = true; 852 } 853 854 @Override 855 public void onNodesUpdated(List<NodeReport> updatedNodes) {} 856 857 @Override 858 public float getProgress() { 859 // set progress to deliver to RM on next heartbeat 860 float progress = (float) numCompletedContainers.get() 861 / numTotalContainers; 862 return progress; 863 } 864 865 @Override 866 public void onError(Throwable e) { 867 LOG.error("Error in RMCallbackHandler: ", e); 868 done = true; 869 amRMClient.stop(); 870 } 871 } 872 873 @VisibleForTesting 874 static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler { 875 876 private ConcurrentMap<ContainerId, Container> containers = 877 new ConcurrentHashMap<ContainerId, Container>(); 878 private final ApplicationMaster applicationMaster; 879 880 public NMCallbackHandler(ApplicationMaster applicationMaster) { 881 this.applicationMaster = applicationMaster; 882 } 883 884 public void addContainer(ContainerId containerId, Container container) { 885 containers.putIfAbsent(containerId, container); 886 } 887 888 @Override 889 public void onContainerStopped(ContainerId containerId) { 890 if (LOG.isDebugEnabled()) { 891 LOG.debug("Succeeded to stop Container " + containerId); 892 } 893 containers.remove(containerId); 894 } 895 896 @Override 897 public void onContainerStatusReceived(ContainerId containerId, 898 ContainerStatus containerStatus) { 899 if (LOG.isDebugEnabled()) { 900 LOG.debug("Container Status: id=" + containerId + ", status=" + 901 containerStatus); 902 } 903 } 904 905 @Override 906 public void onContainerStarted(ContainerId containerId, 907 Map<String, ByteBuffer> allServiceResponse) { 908 if (LOG.isDebugEnabled()) { 909 LOG.debug("Succeeded to start Container " + containerId); 910 } 911 Container container = containers.get(containerId); 912 if (container != null) { 913 applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); 914 } 915 if(applicationMaster.timelineClient != null) { 916 applicationMaster.publishContainerStartEvent( 917 applicationMaster.timelineClient, container, 918 applicationMaster.domainId, applicationMaster.appSubmitterUgi); 919 } 920 } 921 922 @Override 923 public void onContainerResourceIncreased( 924 ContainerId containerId, Resource resource) {} 925 926 @Override 927 public void onStartContainerError(ContainerId containerId, Throwable t) { 928 LOG.error("Failed to start Container " + containerId); 929 containers.remove(containerId); 930 applicationMaster.numCompletedContainers.incrementAndGet(); 931 applicationMaster.numFailedContainers.incrementAndGet(); 932 } 933 934 @Override 935 public void onGetContainerStatusError( 936 ContainerId containerId, Throwable t) { 937 LOG.error("Failed to query the status of Container " + containerId); 938 } 939 940 @Override 941 public void onStopContainerError(ContainerId containerId, Throwable t) { 942 LOG.error("Failed to stop Container " + containerId); 943 containers.remove(containerId); 944 } 945 946 @Override 947 public void onIncreaseContainerResourceError( 948 ContainerId containerId, Throwable t) {} 949 950 } 951 952 /** 953 * Thread to connect to the {@link ContainerManagementProtocol} and launch the container 954 * that will execute the shell command. 955 */ 956 private class LaunchContainerRunnable implements Runnable { 957 958 // Allocated container 959 private Container container; 960 private String shellId; 961 962 NMCallbackHandler containerListener; 963 964 /** 965 * @param lcontainer Allocated container 966 * @param containerListener Callback handler of the container 967 */ 968 public LaunchContainerRunnable(Container lcontainer, 969 NMCallbackHandler containerListener, String shellId) { 970 this.container = lcontainer; 971 this.containerListener = containerListener; 972 this.shellId = shellId; 973 } 974 975 @Override 976 /** 977 * Connects to CM, sets up container launch context 978 * for shell command and eventually dispatches the container 979 * start request to the CM. 980 */ 981 public void run() { 982 LOG.info("Setting up container launch container for containerid=" 983 + container.getId() + " with shellid=" + shellId); 984 985 // Set the local resources 986 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 987 988 // The container for the eventual shell commands needs its own local 989 // resources too. 990 // In this scenario, if a shell script is specified, we need to have it 991 // copied and made available to the container. 992 if (!scriptPath.isEmpty()) { 993 Path renamedScriptPath = null; 994 if (Shell.WINDOWS) { 995 renamedScriptPath = new Path(scriptPath + ".bat"); 996 } else { 997 renamedScriptPath = new Path(scriptPath + ".sh"); 998 } 999 1000 try { 1001 // rename the script file based on the underlying OS syntax. 1002 renameScriptFile(renamedScriptPath); 1003 } catch (Exception e) { 1004 LOG.error( 1005 "Not able to add suffix (.bat/.sh) to the shell script filename", 1006 e); 1007 // We know we cannot continue launching the container 1008 // so we should release it. 1009 numCompletedContainers.incrementAndGet(); 1010 numFailedContainers.incrementAndGet(); 1011 return; 1012 } 1013 1014 URL yarnUrl = null; 1015 try { 1016 yarnUrl = URL.fromURI(new URI(renamedScriptPath.toString())); 1017 } catch (URISyntaxException e) { 1018 LOG.error("Error when trying to use shell script path specified" 1019 + " in env, path=" + renamedScriptPath, e); 1020 // A failure scenario on bad input such as invalid shell script path 1021 // We know we cannot continue launching the container 1022 // so we should release it. 1023 // TODO 1024 numCompletedContainers.incrementAndGet(); 1025 numFailedContainers.incrementAndGet(); 1026 return; 1027 } 1028 LocalResource shellRsrc = LocalResource.newInstance(yarnUrl, 1029 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 1030 shellScriptPathLen, shellScriptPathTimestamp); 1031 localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath : 1032 ExecShellStringPath, shellRsrc); 1033 shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; 1034 } 1035 1036 // Set the necessary command to execute on the allocated container 1037 Vector<CharSequence> vargs = new Vector<CharSequence>(5); 1038 1039 // Set executable command 1040 vargs.add(shellCommand); 1041 // Set shell script path 1042 if (!scriptPath.isEmpty()) { 1043 vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath 1044 : ExecShellStringPath); 1045 } 1046 1047 // Set args for the shell command if any 1048 vargs.add(shellArgs); 1049 // Add log redirect params 1050 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); 1051 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); 1052 1053 // Get final commmand 1054 StringBuilder command = new StringBuilder(); 1055 for (CharSequence str : vargs) { 1056 command.append(str).append(" "); 1057 } 1058 1059 List<String> commands = new ArrayList<String>(); 1060 commands.add(command.toString()); 1061 1062 // Set up ContainerLaunchContext, setting local resource, environment, 1063 // command and token for constructor. 1064 1065 // Note for tokens: Set up tokens for the container too. Today, for normal 1066 // shell commands, the container in distribute-shell doesn't need any 1067 // tokens. We are populating them mainly for NodeManagers to be able to 1068 // download anyfiles in the distributed file-system. The tokens are 1069 // otherwise also useful in cases, for e.g., when one is running a 1070 // "hadoop dfs" command inside the distributed shell. 1071 Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv); 1072 myShellEnv.put(YARN_SHELL_ID, shellId); 1073 ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( 1074 localResources, myShellEnv, commands, null, allTokens.duplicate(), 1075 null); 1076 containerListener.addContainer(container.getId(), container); 1077 nmClientAsync.startContainerAsync(container, ctx); 1078 } 1079 } 1080 1081 private void renameScriptFile(final Path renamedScriptPath) 1082 throws IOException, InterruptedException { 1083 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 1084 @Override 1085 public Void run() throws IOException { 1086 FileSystem fs = renamedScriptPath.getFileSystem(conf); 1087 fs.rename(new Path(scriptPath), renamedScriptPath); 1088 return null; 1089 } 1090 }); 1091 LOG.info("User " + appSubmitterUgi.getUserName() 1092 + " added suffix(.sh/.bat) to script file as " + renamedScriptPath); 1093 } 1094 1095 /** 1096 * Setup the request that will be sent to the RM for the container ask. 1097 * 1098 * @return the setup ResourceRequest to be sent to RM 1099 */ 1100 private ContainerRequest setupContainerAskForRM() { 1101 // setup requirements for hosts 1102 // using * as any host will do for the distributed shell app 1103 // set the priority for the request 1104 // TODO - what is the range for priority? how to decide? 1105 Priority pri = Priority.newInstance(requestPriority); 1106 1107 // Set up resource type requirements 1108 // For now, memory and CPU are supported so we set memory and cpu requirements 1109 Resource capability = Resource.newInstance(containerMemory, 1110 containerVirtualCores); 1111 1112 ContainerRequest request = new ContainerRequest(capability, null, null, 1113 pri); 1114 LOG.info("Requested container ask: " + request.toString()); 1115 return request; 1116 } 1117 1118 private boolean fileExist(String filePath) { 1119 return new File(filePath).exists(); 1120 } 1121 1122 private String readContent(String filePath) throws IOException { 1123 DataInputStream ds = null; 1124 try { 1125 ds = new DataInputStream(new FileInputStream(filePath)); 1126 return ds.readUTF(); 1127 } finally { 1128 org.apache.commons.io.IOUtils.closeQuietly(ds); 1129 } 1130 } 1131 1132 private void publishContainerStartEvent( 1133 final TimelineClient timelineClient, final Container container, 1134 String domainId, UserGroupInformation ugi) { 1135 final TimelineEntity entity = new TimelineEntity(); 1136 entity.setEntityId(container.getId().toString()); 1137 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1138 entity.setDomainId(domainId); 1139 entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); 1140 entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getId() 1141 .getApplicationAttemptId().getApplicationId().toString()); 1142 TimelineEvent event = new TimelineEvent(); 1143 event.setTimestamp(System.currentTimeMillis()); 1144 event.setEventType(DSEvent.DS_CONTAINER_START.toString()); 1145 event.addEventInfo("Node", container.getNodeId().toString()); 1146 event.addEventInfo("Resources", container.getResource().toString()); 1147 entity.addEvent(event); 1148 1149 try { 1150 processTimelineResponseErrors( 1151 putContainerEntity(timelineClient, 1152 container.getId().getApplicationAttemptId(), 1153 entity)); 1154 } catch (YarnException | IOException | ClientHandlerException e) { 1155 LOG.error("Container start event could not be published for " 1156 + container.getId().toString(), e); 1157 } 1158 } 1159 1160 @VisibleForTesting 1161 void publishContainerEndEvent( 1162 final TimelineClient timelineClient, ContainerStatus container, 1163 String domainId, UserGroupInformation ugi) { 1164 final TimelineEntity entity = new TimelineEntity(); 1165 entity.setEntityId(container.getContainerId().toString()); 1166 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1167 entity.setDomainId(domainId); 1168 entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); 1169 entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, 1170 container.getContainerId().getApplicationAttemptId() 1171 .getApplicationId().toString()); 1172 TimelineEvent event = new TimelineEvent(); 1173 event.setTimestamp(System.currentTimeMillis()); 1174 event.setEventType(DSEvent.DS_CONTAINER_END.toString()); 1175 event.addEventInfo("State", container.getState().name()); 1176 event.addEventInfo("Exit Status", container.getExitStatus()); 1177 entity.addEvent(event); 1178 try { 1179 processTimelineResponseErrors( 1180 putContainerEntity(timelineClient, 1181 container.getContainerId().getApplicationAttemptId(), 1182 entity)); 1183 } catch (YarnException | IOException | ClientHandlerException e) { 1184 LOG.error("Container end event could not be published for " 1185 + container.getContainerId().toString(), e); 1186 } 1187 } 1188 1189 private TimelinePutResponse putContainerEntity( 1190 TimelineClient timelineClient, ApplicationAttemptId currAttemptId, 1191 TimelineEntity entity) 1192 throws YarnException, IOException { 1193 if (TimelineUtils.timelineServiceV1_5Enabled(conf)) { 1194 TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance( 1195 currAttemptId.getApplicationId(), 1196 CONTAINER_ENTITY_GROUP_ID); 1197 return timelineClient.putEntities(currAttemptId, groupId, entity); 1198 } else { 1199 return timelineClient.putEntities(entity); 1200 } 1201 } 1202 1203 private void publishApplicationAttemptEvent( 1204 final TimelineClient timelineClient, String appAttemptId, 1205 DSEvent appEvent, String domainId, UserGroupInformation ugi) { 1206 final TimelineEntity entity = new TimelineEntity(); 1207 entity.setEntityId(appAttemptId); 1208 entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); 1209 entity.setDomainId(domainId); 1210 entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); 1211 TimelineEvent event = new TimelineEvent(); 1212 event.setEventType(appEvent.toString()); 1213 event.setTimestamp(System.currentTimeMillis()); 1214 entity.addEvent(event); 1215 try { 1216 TimelinePutResponse response = timelineClient.putEntities(entity); 1217 processTimelineResponseErrors(response); 1218 } catch (YarnException | IOException | ClientHandlerException e) { 1219 LOG.error("App Attempt " 1220 + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") 1221 + " event could not be published for " 1222 + appAttemptId.toString(), e); 1223 } 1224 } 1225 1226 private TimelinePutResponse processTimelineResponseErrors( 1227 TimelinePutResponse response) { 1228 List<TimelinePutResponse.TimelinePutError> errors = response.getErrors(); 1229 if (errors.size() == 0) { 1230 LOG.debug("Timeline entities are successfully put"); 1231 } else { 1232 for (TimelinePutResponse.TimelinePutError error : errors) { 1233 LOG.error( 1234 "Error when publishing entity [" + error.getEntityType() + "," 1235 + error.getEntityId() + "], server side error code: " 1236 + error.getErrorCode()); 1237 } 1238 } 1239 return response; 1240 } 1241 1242 RMCallbackHandler getRMCallbackHandler() { 1243 return new RMCallbackHandler(); 1244 } 1245 1246 @VisibleForTesting 1247 void setAmRMClient(AMRMClientAsync client) { 1248 this.amRMClient = client; 1249 } 1250 1251 @VisibleForTesting 1252 int getNumCompletedContainers() { 1253 return numCompletedContainers.get(); 1254 } 1255 1256 @VisibleForTesting 1257 boolean getDone() { 1258 return done; 1259 } 1260 1261 @VisibleForTesting 1262 Thread createLaunchContainerThread(Container allocatedContainer, 1263 String shellId) { 1264 LaunchContainerRunnable runnableLaunchContainer = 1265 new LaunchContainerRunnable(allocatedContainer, containerListener, 1266 shellId); 1267 return new Thread(runnableLaunchContainer); 1268 } 1269}