001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.applications.distributedshell; 020 021 import java.io.BufferedReader; 022 import java.io.DataInputStream; 023 import java.io.File; 024 import java.io.FileInputStream; 025 import java.io.IOException; 026 import java.io.StringReader; 027 import java.net.URI; 028 import java.net.URISyntaxException; 029 import java.nio.ByteBuffer; 030 import java.security.PrivilegedExceptionAction; 031 import java.util.ArrayList; 032 import java.util.HashMap; 033 import java.util.Iterator; 034 import java.util.List; 035 import java.util.Map; 036 import java.util.Vector; 037 import java.util.concurrent.ConcurrentHashMap; 038 import java.util.concurrent.ConcurrentMap; 039 import java.util.concurrent.atomic.AtomicInteger; 040 041 import org.apache.commons.cli.CommandLine; 042 import org.apache.commons.cli.GnuParser; 043 import org.apache.commons.cli.HelpFormatter; 044 import org.apache.commons.cli.Options; 045 import org.apache.commons.cli.ParseException; 046 import org.apache.commons.logging.Log; 047 import org.apache.commons.logging.LogFactory; 048 import org.apache.hadoop.classification.InterfaceAudience; 049 import org.apache.hadoop.classification.InterfaceAudience.Private; 050 import org.apache.hadoop.classification.InterfaceStability; 051 import org.apache.hadoop.conf.Configuration; 052 import org.apache.hadoop.fs.FileSystem; 053 import org.apache.hadoop.fs.Path; 054 import org.apache.hadoop.io.DataOutputBuffer; 055 import org.apache.hadoop.io.IOUtils; 056 import org.apache.hadoop.net.NetUtils; 057 import org.apache.hadoop.security.Credentials; 058 import org.apache.hadoop.security.UserGroupInformation; 059 import org.apache.hadoop.security.token.Token; 060 import org.apache.hadoop.util.ExitUtil; 061 import org.apache.hadoop.util.Shell; 062 import org.apache.hadoop.yarn.api.ApplicationConstants; 063 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 064 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; 065 import org.apache.hadoop.yarn.api.ContainerManagementProtocol; 066 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; 067 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 068 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; 069 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 070 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 071 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 072 import org.apache.hadoop.yarn.api.records.Container; 073 import org.apache.hadoop.yarn.api.records.ContainerExitStatus; 074 import org.apache.hadoop.yarn.api.records.ContainerId; 075 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 076 import org.apache.hadoop.yarn.api.records.ContainerState; 077 import org.apache.hadoop.yarn.api.records.ContainerStatus; 078 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 079 import org.apache.hadoop.yarn.api.records.LocalResource; 080 import org.apache.hadoop.yarn.api.records.LocalResourceType; 081 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 082 import org.apache.hadoop.yarn.api.records.NodeReport; 083 import org.apache.hadoop.yarn.api.records.Priority; 084 import org.apache.hadoop.yarn.api.records.Resource; 085 import org.apache.hadoop.yarn.api.records.ResourceRequest; 086 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; 087 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; 088 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 089 import org.apache.hadoop.yarn.client.api.TimelineClient; 090 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; 091 import org.apache.hadoop.yarn.client.api.async.NMClientAsync; 092 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; 093 import org.apache.hadoop.yarn.conf.YarnConfiguration; 094 import org.apache.hadoop.yarn.exceptions.YarnException; 095 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 096 import org.apache.hadoop.yarn.util.ConverterUtils; 097 import org.apache.hadoop.yarn.util.Records; 098 import org.apache.log4j.LogManager; 099 100 import com.google.common.annotations.VisibleForTesting; 101 102 /** 103 * An ApplicationMaster for executing shell commands on a set of launched 104 * containers using the YARN framework. 105 * 106 * <p> 107 * This class is meant to act as an example on how to write yarn-based 108 * application masters. 109 * </p> 110 * 111 * <p> 112 * The ApplicationMaster is started on a container by the 113 * <code>ResourceManager</code>'s launcher. The first thing that the 114 * <code>ApplicationMaster</code> needs to do is to connect and register itself 115 * with the <code>ResourceManager</code>. The registration sets up information 116 * within the <code>ResourceManager</code> regarding what host:port the 117 * ApplicationMaster is listening on to provide any form of functionality to a 118 * client as well as a tracking url that a client can use to keep track of 119 * status/job history if needed. However, in the distributedshell, trackingurl 120 * and appMasterHost:appMasterRpcPort are not supported. 121 * </p> 122 * 123 * <p> 124 * The <code>ApplicationMaster</code> needs to send a heartbeat to the 125 * <code>ResourceManager</code> at regular intervals to inform the 126 * <code>ResourceManager</code> that it is up and alive. The 127 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the 128 * <code>ApplicationMaster</code> acts as a heartbeat. 129 * 130 * <p> 131 * For the actual handling of the job, the <code>ApplicationMaster</code> has to 132 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the 133 * required no. of containers using {@link ResourceRequest} with the necessary 134 * resource specifications such as node location, computational 135 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code> 136 * responds with an {@link AllocateResponse} that informs the 137 * <code>ApplicationMaster</code> of the set of newly allocated containers, 138 * completed containers as well as current state of available resources. 139 * </p> 140 * 141 * <p> 142 * For each allocated container, the <code>ApplicationMaster</code> can then set 143 * up the necessary launch context via {@link ContainerLaunchContext} to specify 144 * the allocated container id, local resources required by the executable, the 145 * environment to be setup for the executable, commands to execute, etc. and 146 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to 147 * launch and execute the defined commands on the given allocated container. 148 * </p> 149 * 150 * <p> 151 * The <code>ApplicationMaster</code> can monitor the launched container by 152 * either querying the <code>ResourceManager</code> using 153 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via 154 * the {@link ContainerManagementProtocol} by querying for the status of the allocated 155 * container's {@link ContainerId}. 156 * 157 * <p> 158 * After the job has been completed, the <code>ApplicationMaster</code> has to 159 * send a {@link FinishApplicationMasterRequest} to the 160 * <code>ResourceManager</code> to inform it that the 161 * <code>ApplicationMaster</code> has been completed. 162 */ 163 @InterfaceAudience.Public 164 @InterfaceStability.Unstable 165 public class ApplicationMaster { 166 167 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); 168 169 @VisibleForTesting 170 @Private 171 public static enum DSEvent { 172 DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END 173 } 174 175 @VisibleForTesting 176 @Private 177 public static enum DSEntity { 178 DS_APP_ATTEMPT, DS_CONTAINER 179 } 180 181 // Configuration 182 private Configuration conf; 183 184 // Handle to communicate with the Resource Manager 185 @SuppressWarnings("rawtypes") 186 private AMRMClientAsync amRMClient; 187 188 // In both secure and non-secure modes, this points to the job-submitter. 189 private UserGroupInformation appSubmitterUgi; 190 191 // Handle to communicate with the Node Manager 192 private NMClientAsync nmClientAsync; 193 // Listen to process the response from the Node Manager 194 private NMCallbackHandler containerListener; 195 196 // Application Attempt Id ( combination of attemptId and fail count ) 197 @VisibleForTesting 198 protected ApplicationAttemptId appAttemptID; 199 200 // TODO 201 // For status update for clients - yet to be implemented 202 // Hostname of the container 203 private String appMasterHostname = ""; 204 // Port on which the app master listens for status updates from clients 205 private int appMasterRpcPort = -1; 206 // Tracking url to which app master publishes info for clients to monitor 207 private String appMasterTrackingUrl = ""; 208 209 // App Master configuration 210 // No. of containers to run shell command on 211 private int numTotalContainers = 1; 212 // Memory to request for the container on which the shell command will run 213 private int containerMemory = 10; 214 // VirtualCores to request for the container on which the shell command will run 215 private int containerVirtualCores = 1; 216 // Priority of the request 217 private int requestPriority; 218 219 // Counter for completed containers ( complete denotes successful or failed ) 220 private AtomicInteger numCompletedContainers = new AtomicInteger(); 221 // Allocated container count so that we know how many containers has the RM 222 // allocated to us 223 @VisibleForTesting 224 protected AtomicInteger numAllocatedContainers = new AtomicInteger(); 225 // Count of failed containers 226 private AtomicInteger numFailedContainers = new AtomicInteger(); 227 // Count of containers already requested from the RM 228 // Needed as once requested, we should not request for containers again. 229 // Only request for more if the original requirement changes. 230 @VisibleForTesting 231 protected AtomicInteger numRequestedContainers = new AtomicInteger(); 232 233 // Shell command to be executed 234 private String shellCommand = ""; 235 // Args to be passed to the shell command 236 private String shellArgs = ""; 237 // Env variables to be setup for the shell command 238 private Map<String, String> shellEnv = new HashMap<String, String>(); 239 240 // Location of shell script ( obtained from info set in env ) 241 // Shell script path in fs 242 private String scriptPath = ""; 243 // Timestamp needed for creating a local resource 244 private long shellScriptPathTimestamp = 0; 245 // File length needed for local resource 246 private long shellScriptPathLen = 0; 247 248 // Hardcoded path to shell script in launch container's local env 249 private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh"; 250 private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH 251 + ".bat"; 252 253 // Hardcoded path to custom log_properties 254 private static final String log4jPath = "log4j.properties"; 255 256 private static final String shellCommandPath = "shellCommands"; 257 private static final String shellArgsPath = "shellArgs"; 258 259 private volatile boolean done; 260 261 private ByteBuffer allTokens; 262 263 // Launch threads 264 private List<Thread> launchThreads = new ArrayList<Thread>(); 265 266 // Timeline Client 267 private TimelineClient timelineClient; 268 269 private final String linux_bash_command = "bash"; 270 private final String windows_command = "cmd /c"; 271 272 /** 273 * @param args Command line args 274 */ 275 public static void main(String[] args) { 276 boolean result = false; 277 try { 278 ApplicationMaster appMaster = new ApplicationMaster(); 279 LOG.info("Initializing ApplicationMaster"); 280 boolean doRun = appMaster.init(args); 281 if (!doRun) { 282 System.exit(0); 283 } 284 appMaster.run(); 285 result = appMaster.finish(); 286 } catch (Throwable t) { 287 LOG.fatal("Error running ApplicationMaster", t); 288 LogManager.shutdown(); 289 ExitUtil.terminate(1, t); 290 } 291 if (result) { 292 LOG.info("Application Master completed successfully. exiting"); 293 System.exit(0); 294 } else { 295 LOG.info("Application Master failed. exiting"); 296 System.exit(2); 297 } 298 } 299 300 /** 301 * Dump out contents of $CWD and the environment to stdout for debugging 302 */ 303 private void dumpOutDebugInfo() { 304 305 LOG.info("Dump debug output"); 306 Map<String, String> envs = System.getenv(); 307 for (Map.Entry<String, String> env : envs.entrySet()) { 308 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); 309 System.out.println("System env: key=" + env.getKey() + ", val=" 310 + env.getValue()); 311 } 312 313 BufferedReader buf = null; 314 try { 315 String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : 316 Shell.execCommand("ls", "-al"); 317 buf = new BufferedReader(new StringReader(lines)); 318 String line = ""; 319 while ((line = buf.readLine()) != null) { 320 LOG.info("System CWD content: " + line); 321 System.out.println("System CWD content: " + line); 322 } 323 } catch (IOException e) { 324 e.printStackTrace(); 325 } finally { 326 IOUtils.cleanup(LOG, buf); 327 } 328 } 329 330 public ApplicationMaster() { 331 // Set up the configuration 332 conf = new YarnConfiguration(); 333 } 334 335 /** 336 * Parse command line options 337 * 338 * @param args Command line args 339 * @return Whether init successful and run should be invoked 340 * @throws ParseException 341 * @throws IOException 342 */ 343 public boolean init(String[] args) throws ParseException, IOException { 344 Options opts = new Options(); 345 opts.addOption("app_attempt_id", true, 346 "App Attempt ID. Not to be used unless for testing purposes"); 347 opts.addOption("shell_env", true, 348 "Environment for shell script. Specified as env_key=env_val pairs"); 349 opts.addOption("container_memory", true, 350 "Amount of memory in MB to be requested to run the shell command"); 351 opts.addOption("container_vcores", true, 352 "Amount of virtual cores to be requested to run the shell command"); 353 opts.addOption("num_containers", true, 354 "No. of containers on which the shell command needs to be executed"); 355 opts.addOption("priority", true, "Application Priority. Default 0"); 356 opts.addOption("debug", false, "Dump out debug information"); 357 358 opts.addOption("help", false, "Print usage"); 359 CommandLine cliParser = new GnuParser().parse(opts, args); 360 361 if (args.length == 0) { 362 printUsage(opts); 363 throw new IllegalArgumentException( 364 "No args specified for application master to initialize"); 365 } 366 367 //Check whether customer log4j.properties file exists 368 if (fileExist(log4jPath)) { 369 try { 370 Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, 371 log4jPath); 372 } catch (Exception e) { 373 LOG.warn("Can not set up custom log4j properties. " + e); 374 } 375 } 376 377 if (cliParser.hasOption("help")) { 378 printUsage(opts); 379 return false; 380 } 381 382 if (cliParser.hasOption("debug")) { 383 dumpOutDebugInfo(); 384 } 385 386 Map<String, String> envs = System.getenv(); 387 388 if (!envs.containsKey(Environment.CONTAINER_ID.name())) { 389 if (cliParser.hasOption("app_attempt_id")) { 390 String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); 391 appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); 392 } else { 393 throw new IllegalArgumentException( 394 "Application Attempt Id not set in the environment"); 395 } 396 } else { 397 ContainerId containerId = ConverterUtils.toContainerId(envs 398 .get(Environment.CONTAINER_ID.name())); 399 appAttemptID = containerId.getApplicationAttemptId(); 400 } 401 402 if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { 403 throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV 404 + " not set in the environment"); 405 } 406 if (!envs.containsKey(Environment.NM_HOST.name())) { 407 throw new RuntimeException(Environment.NM_HOST.name() 408 + " not set in the environment"); 409 } 410 if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) { 411 throw new RuntimeException(Environment.NM_HTTP_PORT 412 + " not set in the environment"); 413 } 414 if (!envs.containsKey(Environment.NM_PORT.name())) { 415 throw new RuntimeException(Environment.NM_PORT.name() 416 + " not set in the environment"); 417 } 418 419 LOG.info("Application master for app" + ", appId=" 420 + appAttemptID.getApplicationId().getId() + ", clustertimestamp=" 421 + appAttemptID.getApplicationId().getClusterTimestamp() 422 + ", attemptId=" + appAttemptID.getAttemptId()); 423 424 if (!fileExist(shellCommandPath) 425 && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) { 426 throw new IllegalArgumentException( 427 "No shell command or shell script specified to be executed by application master"); 428 } 429 430 if (fileExist(shellCommandPath)) { 431 shellCommand = readContent(shellCommandPath); 432 } 433 434 if (fileExist(shellArgsPath)) { 435 shellArgs = readContent(shellArgsPath); 436 } 437 438 if (cliParser.hasOption("shell_env")) { 439 String shellEnvs[] = cliParser.getOptionValues("shell_env"); 440 for (String env : shellEnvs) { 441 env = env.trim(); 442 int index = env.indexOf('='); 443 if (index == -1) { 444 shellEnv.put(env, ""); 445 continue; 446 } 447 String key = env.substring(0, index); 448 String val = ""; 449 if (index < (env.length() - 1)) { 450 val = env.substring(index + 1); 451 } 452 shellEnv.put(key, val); 453 } 454 } 455 456 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { 457 scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); 458 459 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { 460 shellScriptPathTimestamp = Long.valueOf(envs 461 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); 462 } 463 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { 464 shellScriptPathLen = Long.valueOf(envs 465 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); 466 } 467 468 if (!scriptPath.isEmpty() 469 && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { 470 LOG.error("Illegal values in env for shell script path" + ", path=" 471 + scriptPath + ", len=" + shellScriptPathLen + ", timestamp=" 472 + shellScriptPathTimestamp); 473 throw new IllegalArgumentException( 474 "Illegal values in env for shell script path"); 475 } 476 } 477 478 containerMemory = Integer.parseInt(cliParser.getOptionValue( 479 "container_memory", "10")); 480 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( 481 "container_vcores", "1")); 482 numTotalContainers = Integer.parseInt(cliParser.getOptionValue( 483 "num_containers", "1")); 484 if (numTotalContainers == 0) { 485 throw new IllegalArgumentException( 486 "Cannot run distributed shell with no containers"); 487 } 488 requestPriority = Integer.parseInt(cliParser 489 .getOptionValue("priority", "0")); 490 491 // Creating the Timeline Client 492 timelineClient = TimelineClient.createTimelineClient(); 493 timelineClient.init(conf); 494 timelineClient.start(); 495 496 return true; 497 } 498 499 /** 500 * Helper function to print usage 501 * 502 * @param opts Parsed command line options 503 */ 504 private void printUsage(Options opts) { 505 new HelpFormatter().printHelp("ApplicationMaster", opts); 506 } 507 508 /** 509 * Main run function for the application master 510 * 511 * @throws YarnException 512 * @throws IOException 513 */ 514 @SuppressWarnings({ "unchecked" }) 515 public void run() throws YarnException, IOException { 516 LOG.info("Starting ApplicationMaster"); 517 try { 518 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 519 DSEvent.DS_APP_ATTEMPT_START); 520 } catch (Exception e) { 521 LOG.error("App Attempt start event coud not be pulished for " 522 + appAttemptID.toString(), e); 523 } 524 525 Credentials credentials = 526 UserGroupInformation.getCurrentUser().getCredentials(); 527 DataOutputBuffer dob = new DataOutputBuffer(); 528 credentials.writeTokenStorageToStream(dob); 529 // Now remove the AM->RM token so that containers cannot access it. 530 Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); 531 LOG.info("Executing with tokens:"); 532 while (iter.hasNext()) { 533 Token<?> token = iter.next(); 534 LOG.info(token); 535 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { 536 iter.remove(); 537 } 538 } 539 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 540 541 // Create appSubmitterUgi and add original tokens to it 542 String appSubmitterUserName = 543 System.getenv(ApplicationConstants.Environment.USER.name()); 544 appSubmitterUgi = 545 UserGroupInformation.createRemoteUser(appSubmitterUserName); 546 appSubmitterUgi.addCredentials(credentials); 547 548 AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); 549 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); 550 amRMClient.init(conf); 551 amRMClient.start(); 552 553 containerListener = createNMCallbackHandler(); 554 nmClientAsync = new NMClientAsyncImpl(containerListener); 555 nmClientAsync.init(conf); 556 nmClientAsync.start(); 557 558 // Setup local RPC Server to accept status requests directly from clients 559 // TODO need to setup a protocol for client to be able to communicate to 560 // the RPC server 561 // TODO use the rpc port info to register with the RM for the client to 562 // send requests to this app master 563 564 // Register self with ResourceManager 565 // This will start heartbeating to the RM 566 appMasterHostname = NetUtils.getHostname(); 567 RegisterApplicationMasterResponse response = amRMClient 568 .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 569 appMasterTrackingUrl); 570 // Dump out information about cluster capability as seen by the 571 // resource manager 572 int maxMem = response.getMaximumResourceCapability().getMemory(); 573 LOG.info("Max mem capabililty of resources in this cluster " + maxMem); 574 575 int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); 576 LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores); 577 578 // A resource ask cannot exceed the max. 579 if (containerMemory > maxMem) { 580 LOG.info("Container memory specified above max threshold of cluster." 581 + " Using max value." + ", specified=" + containerMemory + ", max=" 582 + maxMem); 583 containerMemory = maxMem; 584 } 585 586 if (containerVirtualCores > maxVCores) { 587 LOG.info("Container virtual cores specified above max threshold of cluster." 588 + " Using max value." + ", specified=" + containerVirtualCores + ", max=" 589 + maxVCores); 590 containerVirtualCores = maxVCores; 591 } 592 593 List<Container> previousAMRunningContainers = 594 response.getContainersFromPreviousAttempts(); 595 LOG.info("Received " + previousAMRunningContainers.size() 596 + " previous AM's running containers on AM registration."); 597 numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); 598 599 int numTotalContainersToRequest = 600 numTotalContainers - previousAMRunningContainers.size(); 601 // Setup ask for containers from RM 602 // Send request for containers to RM 603 // Until we get our fully allocated quota, we keep on polling RM for 604 // containers 605 // Keep looping until all the containers are launched and shell script 606 // executed on them ( regardless of success/failure). 607 for (int i = 0; i < numTotalContainersToRequest; ++i) { 608 ContainerRequest containerAsk = setupContainerAskForRM(); 609 amRMClient.addContainerRequest(containerAsk); 610 } 611 numRequestedContainers.set(numTotalContainersToRequest); 612 try { 613 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 614 DSEvent.DS_APP_ATTEMPT_END); 615 } catch (Exception e) { 616 LOG.error("App Attempt start event coud not be pulished for " 617 + appAttemptID.toString(), e); 618 } 619 } 620 621 @VisibleForTesting 622 NMCallbackHandler createNMCallbackHandler() { 623 return new NMCallbackHandler(this); 624 } 625 626 @VisibleForTesting 627 protected boolean finish() { 628 // wait for completion. 629 while (!done 630 && (numCompletedContainers.get() != numTotalContainers)) { 631 try { 632 Thread.sleep(200); 633 } catch (InterruptedException ex) {} 634 } 635 636 // Join all launched threads 637 // needed for when we time out 638 // and we need to release containers 639 for (Thread launchThread : launchThreads) { 640 try { 641 launchThread.join(10000); 642 } catch (InterruptedException e) { 643 LOG.info("Exception thrown in thread join: " + e.getMessage()); 644 e.printStackTrace(); 645 } 646 } 647 648 // When the application completes, it should stop all running containers 649 LOG.info("Application completed. Stopping running containers"); 650 nmClientAsync.stop(); 651 652 // When the application completes, it should send a finish application 653 // signal to the RM 654 LOG.info("Application completed. Signalling finish to RM"); 655 656 FinalApplicationStatus appStatus; 657 String appMessage = null; 658 boolean success = true; 659 if (numFailedContainers.get() == 0 && 660 numCompletedContainers.get() == numTotalContainers) { 661 appStatus = FinalApplicationStatus.SUCCEEDED; 662 } else { 663 appStatus = FinalApplicationStatus.FAILED; 664 appMessage = "Diagnostics." + ", total=" + numTotalContainers 665 + ", completed=" + numCompletedContainers.get() + ", allocated=" 666 + numAllocatedContainers.get() + ", failed=" 667 + numFailedContainers.get(); 668 success = false; 669 } 670 try { 671 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); 672 } catch (YarnException ex) { 673 LOG.error("Failed to unregister application", ex); 674 } catch (IOException e) { 675 LOG.error("Failed to unregister application", e); 676 } 677 678 amRMClient.stop(); 679 680 return success; 681 } 682 683 private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { 684 @SuppressWarnings("unchecked") 685 @Override 686 public void onContainersCompleted(List<ContainerStatus> completedContainers) { 687 LOG.info("Got response from RM for container ask, completedCnt=" 688 + completedContainers.size()); 689 for (ContainerStatus containerStatus : completedContainers) { 690 LOG.info("Got container status for containerID=" 691 + containerStatus.getContainerId() + ", state=" 692 + containerStatus.getState() + ", exitStatus=" 693 + containerStatus.getExitStatus() + ", diagnostics=" 694 + containerStatus.getDiagnostics()); 695 696 // non complete containers should not be here 697 assert (containerStatus.getState() == ContainerState.COMPLETE); 698 699 // increment counters for completed/failed containers 700 int exitStatus = containerStatus.getExitStatus(); 701 if (0 != exitStatus) { 702 // container failed 703 if (ContainerExitStatus.ABORTED != exitStatus) { 704 // shell script failed 705 // counts as completed 706 numCompletedContainers.incrementAndGet(); 707 numFailedContainers.incrementAndGet(); 708 } else { 709 // container was killed by framework, possibly preempted 710 // we should re-try as the container was lost for some reason 711 numAllocatedContainers.decrementAndGet(); 712 numRequestedContainers.decrementAndGet(); 713 // we do not need to release the container as it would be done 714 // by the RM 715 } 716 } else { 717 // nothing to do 718 // container completed successfully 719 numCompletedContainers.incrementAndGet(); 720 LOG.info("Container completed successfully." + ", containerId=" 721 + containerStatus.getContainerId()); 722 } 723 try { 724 publishContainerEndEvent(timelineClient, containerStatus); 725 } catch (Exception e) { 726 LOG.error("Container start event could not be pulished for " 727 + containerStatus.getContainerId().toString(), e); 728 } 729 } 730 731 // ask for more containers if any failed 732 int askCount = numTotalContainers - numRequestedContainers.get(); 733 numRequestedContainers.addAndGet(askCount); 734 735 if (askCount > 0) { 736 for (int i = 0; i < askCount; ++i) { 737 ContainerRequest containerAsk = setupContainerAskForRM(); 738 amRMClient.addContainerRequest(containerAsk); 739 } 740 } 741 742 if (numCompletedContainers.get() == numTotalContainers) { 743 done = true; 744 } 745 } 746 747 @Override 748 public void onContainersAllocated(List<Container> allocatedContainers) { 749 LOG.info("Got response from RM for container ask, allocatedCnt=" 750 + allocatedContainers.size()); 751 numAllocatedContainers.addAndGet(allocatedContainers.size()); 752 for (Container allocatedContainer : allocatedContainers) { 753 LOG.info("Launching shell command on a new container." 754 + ", containerId=" + allocatedContainer.getId() 755 + ", containerNode=" + allocatedContainer.getNodeId().getHost() 756 + ":" + allocatedContainer.getNodeId().getPort() 757 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() 758 + ", containerResourceMemory" 759 + allocatedContainer.getResource().getMemory() 760 + ", containerResourceVirtualCores" 761 + allocatedContainer.getResource().getVirtualCores()); 762 // + ", containerToken" 763 // +allocatedContainer.getContainerToken().getIdentifier().toString()); 764 765 LaunchContainerRunnable runnableLaunchContainer = 766 new LaunchContainerRunnable(allocatedContainer, containerListener); 767 Thread launchThread = new Thread(runnableLaunchContainer); 768 769 // launch and start the container on a separate thread to keep 770 // the main thread unblocked 771 // as all containers may not be allocated at one go. 772 launchThreads.add(launchThread); 773 launchThread.start(); 774 } 775 } 776 777 @Override 778 public void onShutdownRequest() { 779 done = true; 780 } 781 782 @Override 783 public void onNodesUpdated(List<NodeReport> updatedNodes) {} 784 785 @Override 786 public float getProgress() { 787 // set progress to deliver to RM on next heartbeat 788 float progress = (float) numCompletedContainers.get() 789 / numTotalContainers; 790 return progress; 791 } 792 793 @Override 794 public void onError(Throwable e) { 795 done = true; 796 amRMClient.stop(); 797 } 798 } 799 800 @VisibleForTesting 801 static class NMCallbackHandler 802 implements NMClientAsync.CallbackHandler { 803 804 private ConcurrentMap<ContainerId, Container> containers = 805 new ConcurrentHashMap<ContainerId, Container>(); 806 private final ApplicationMaster applicationMaster; 807 808 public NMCallbackHandler(ApplicationMaster applicationMaster) { 809 this.applicationMaster = applicationMaster; 810 } 811 812 public void addContainer(ContainerId containerId, Container container) { 813 containers.putIfAbsent(containerId, container); 814 } 815 816 @Override 817 public void onContainerStopped(ContainerId containerId) { 818 if (LOG.isDebugEnabled()) { 819 LOG.debug("Succeeded to stop Container " + containerId); 820 } 821 containers.remove(containerId); 822 } 823 824 @Override 825 public void onContainerStatusReceived(ContainerId containerId, 826 ContainerStatus containerStatus) { 827 if (LOG.isDebugEnabled()) { 828 LOG.debug("Container Status: id=" + containerId + ", status=" + 829 containerStatus); 830 } 831 } 832 833 @Override 834 public void onContainerStarted(ContainerId containerId, 835 Map<String, ByteBuffer> allServiceResponse) { 836 if (LOG.isDebugEnabled()) { 837 LOG.debug("Succeeded to start Container " + containerId); 838 } 839 Container container = containers.get(containerId); 840 if (container != null) { 841 applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); 842 } 843 try { 844 ApplicationMaster.publishContainerStartEvent( 845 applicationMaster.timelineClient, container); 846 } catch (Exception e) { 847 LOG.error("Container start event coud not be pulished for " 848 + container.getId().toString(), e); 849 } 850 } 851 852 @Override 853 public void onStartContainerError(ContainerId containerId, Throwable t) { 854 LOG.error("Failed to start Container " + containerId); 855 containers.remove(containerId); 856 applicationMaster.numCompletedContainers.incrementAndGet(); 857 applicationMaster.numFailedContainers.incrementAndGet(); 858 } 859 860 @Override 861 public void onGetContainerStatusError( 862 ContainerId containerId, Throwable t) { 863 LOG.error("Failed to query the status of Container " + containerId); 864 } 865 866 @Override 867 public void onStopContainerError(ContainerId containerId, Throwable t) { 868 LOG.error("Failed to stop Container " + containerId); 869 containers.remove(containerId); 870 } 871 } 872 873 /** 874 * Thread to connect to the {@link ContainerManagementProtocol} and launch the container 875 * that will execute the shell command. 876 */ 877 private class LaunchContainerRunnable implements Runnable { 878 879 // Allocated container 880 Container container; 881 882 NMCallbackHandler containerListener; 883 884 /** 885 * @param lcontainer Allocated container 886 * @param containerListener Callback handler of the container 887 */ 888 public LaunchContainerRunnable( 889 Container lcontainer, NMCallbackHandler containerListener) { 890 this.container = lcontainer; 891 this.containerListener = containerListener; 892 } 893 894 @Override 895 /** 896 * Connects to CM, sets up container launch context 897 * for shell command and eventually dispatches the container 898 * start request to the CM. 899 */ 900 public void run() { 901 LOG.info("Setting up container launch container for containerid=" 902 + container.getId()); 903 ContainerLaunchContext ctx = Records 904 .newRecord(ContainerLaunchContext.class); 905 906 // Set the environment 907 ctx.setEnvironment(shellEnv); 908 909 // Set the local resources 910 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 911 912 // The container for the eventual shell commands needs its own local 913 // resources too. 914 // In this scenario, if a shell script is specified, we need to have it 915 // copied and made available to the container. 916 if (!scriptPath.isEmpty()) { 917 Path renamedScriptPath = null; 918 if (Shell.WINDOWS) { 919 renamedScriptPath = new Path(scriptPath + ".bat"); 920 } else { 921 renamedScriptPath = new Path(scriptPath + ".sh"); 922 } 923 924 try { 925 // rename the script file based on the underlying OS syntax. 926 renameScriptFile(renamedScriptPath); 927 } catch (Exception e) { 928 LOG.error( 929 "Not able to add suffix (.bat/.sh) to the shell script filename", 930 e); 931 // We know we cannot continue launching the container 932 // so we should release it. 933 numCompletedContainers.incrementAndGet(); 934 numFailedContainers.incrementAndGet(); 935 return; 936 } 937 938 LocalResource shellRsrc = Records.newRecord(LocalResource.class); 939 shellRsrc.setType(LocalResourceType.FILE); 940 shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION); 941 try { 942 shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI( 943 renamedScriptPath.toString()))); 944 } catch (URISyntaxException e) { 945 LOG.error("Error when trying to use shell script path specified" 946 + " in env, path=" + renamedScriptPath, e); 947 948 // A failure scenario on bad input such as invalid shell script path 949 // We know we cannot continue launching the container 950 // so we should release it. 951 // TODO 952 numCompletedContainers.incrementAndGet(); 953 numFailedContainers.incrementAndGet(); 954 return; 955 } 956 shellRsrc.setTimestamp(shellScriptPathTimestamp); 957 shellRsrc.setSize(shellScriptPathLen); 958 localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath : 959 ExecShellStringPath, shellRsrc); 960 shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; 961 } 962 ctx.setLocalResources(localResources); 963 964 // Set the necessary command to execute on the allocated container 965 Vector<CharSequence> vargs = new Vector<CharSequence>(5); 966 967 // Set executable command 968 vargs.add(shellCommand); 969 // Set shell script path 970 if (!scriptPath.isEmpty()) { 971 vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath 972 : ExecShellStringPath); 973 } 974 975 // Set args for the shell command if any 976 vargs.add(shellArgs); 977 // Add log redirect params 978 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); 979 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); 980 981 // Get final commmand 982 StringBuilder command = new StringBuilder(); 983 for (CharSequence str : vargs) { 984 command.append(str).append(" "); 985 } 986 987 List<String> commands = new ArrayList<String>(); 988 commands.add(command.toString()); 989 ctx.setCommands(commands); 990 991 // Set up tokens for the container too. Today, for normal shell commands, 992 // the container in distribute-shell doesn't need any tokens. We are 993 // populating them mainly for NodeManagers to be able to download any 994 // files in the distributed file-system. The tokens are otherwise also 995 // useful in cases, for e.g., when one is running a "hadoop dfs" command 996 // inside the distributed shell. 997 ctx.setTokens(allTokens.duplicate()); 998 999 containerListener.addContainer(container.getId(), container); 1000 nmClientAsync.startContainerAsync(container, ctx); 1001 } 1002 } 1003 1004 private void renameScriptFile(final Path renamedScriptPath) 1005 throws IOException, InterruptedException { 1006 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 1007 @Override 1008 public Void run() throws IOException { 1009 FileSystem fs = renamedScriptPath.getFileSystem(conf); 1010 fs.rename(new Path(scriptPath), renamedScriptPath); 1011 return null; 1012 } 1013 }); 1014 LOG.info("User " + appSubmitterUgi.getUserName() 1015 + " added suffix(.sh/.bat) to script file as " + renamedScriptPath); 1016 } 1017 1018 /** 1019 * Setup the request that will be sent to the RM for the container ask. 1020 * 1021 * @return the setup ResourceRequest to be sent to RM 1022 */ 1023 private ContainerRequest setupContainerAskForRM() { 1024 // setup requirements for hosts 1025 // using * as any host will do for the distributed shell app 1026 // set the priority for the request 1027 Priority pri = Records.newRecord(Priority.class); 1028 // TODO - what is the range for priority? how to decide? 1029 pri.setPriority(requestPriority); 1030 1031 // Set up resource type requirements 1032 // For now, memory and CPU are supported so we set memory and cpu requirements 1033 Resource capability = Records.newRecord(Resource.class); 1034 capability.setMemory(containerMemory); 1035 capability.setVirtualCores(containerVirtualCores); 1036 1037 ContainerRequest request = new ContainerRequest(capability, null, null, 1038 pri); 1039 LOG.info("Requested container ask: " + request.toString()); 1040 return request; 1041 } 1042 1043 private boolean fileExist(String filePath) { 1044 return new File(filePath).exists(); 1045 } 1046 1047 private String readContent(String filePath) throws IOException { 1048 DataInputStream ds = null; 1049 try { 1050 ds = new DataInputStream(new FileInputStream(filePath)); 1051 return ds.readUTF(); 1052 } finally { 1053 org.apache.commons.io.IOUtils.closeQuietly(ds); 1054 } 1055 } 1056 1057 private static void publishContainerStartEvent(TimelineClient timelineClient, 1058 Container container) throws IOException, YarnException { 1059 TimelineEntity entity = new TimelineEntity(); 1060 entity.setEntityId(container.getId().toString()); 1061 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1062 entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser() 1063 .toString()); 1064 TimelineEvent event = new TimelineEvent(); 1065 event.setTimestamp(System.currentTimeMillis()); 1066 event.setEventType(DSEvent.DS_CONTAINER_START.toString()); 1067 event.addEventInfo("Node", container.getNodeId().toString()); 1068 event.addEventInfo("Resources", container.getResource().toString()); 1069 entity.addEvent(event); 1070 1071 timelineClient.putEntities(entity); 1072 } 1073 1074 private static void publishContainerEndEvent(TimelineClient timelineClient, 1075 ContainerStatus container) throws IOException, YarnException { 1076 TimelineEntity entity = new TimelineEntity(); 1077 entity.setEntityId(container.getContainerId().toString()); 1078 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1079 entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser() 1080 .toString()); 1081 TimelineEvent event = new TimelineEvent(); 1082 event.setTimestamp(System.currentTimeMillis()); 1083 event.setEventType(DSEvent.DS_CONTAINER_END.toString()); 1084 event.addEventInfo("State", container.getState().name()); 1085 event.addEventInfo("Exit Status", container.getExitStatus()); 1086 entity.addEvent(event); 1087 1088 timelineClient.putEntities(entity); 1089 } 1090 1091 private static void publishApplicationAttemptEvent( 1092 TimelineClient timelineClient, String appAttemptId, DSEvent appEvent) 1093 throws IOException, YarnException { 1094 TimelineEntity entity = new TimelineEntity(); 1095 entity.setEntityId(appAttemptId); 1096 entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); 1097 entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser() 1098 .toString()); 1099 TimelineEvent event = new TimelineEvent(); 1100 event.setEventType(appEvent.toString()); 1101 event.setTimestamp(System.currentTimeMillis()); 1102 entity.addEvent(event); 1103 1104 timelineClient.putEntities(entity); 1105 } 1106 }