001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.applications.distributedshell; 020 021import java.io.BufferedReader; 022import java.io.DataInputStream; 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.io.StringReader; 027import java.lang.reflect.UndeclaredThrowableException; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.nio.ByteBuffer; 031import java.security.PrivilegedExceptionAction; 032import java.util.ArrayList; 033import java.util.HashMap; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Vector; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.atomic.AtomicInteger; 041 042import org.apache.commons.cli.CommandLine; 043import org.apache.commons.cli.GnuParser; 044import org.apache.commons.cli.HelpFormatter; 045import org.apache.commons.cli.Options; 046import org.apache.commons.cli.ParseException; 047import org.apache.commons.logging.Log; 048import org.apache.commons.logging.LogFactory; 049import org.apache.hadoop.classification.InterfaceAudience; 050import org.apache.hadoop.classification.InterfaceAudience.Private; 051import org.apache.hadoop.classification.InterfaceStability; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.fs.FileSystem; 054import org.apache.hadoop.fs.Path; 055import org.apache.hadoop.io.DataOutputBuffer; 056import org.apache.hadoop.io.IOUtils; 057import org.apache.hadoop.net.NetUtils; 058import org.apache.hadoop.security.Credentials; 059import org.apache.hadoop.security.UserGroupInformation; 060import org.apache.hadoop.security.token.Token; 061import org.apache.hadoop.util.ExitUtil; 062import org.apache.hadoop.util.Shell; 063import org.apache.hadoop.yarn.api.ApplicationConstants; 064import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 065import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; 066import org.apache.hadoop.yarn.api.ContainerManagementProtocol; 067import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; 068import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 069import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; 070import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 071import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 072import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 073import org.apache.hadoop.yarn.api.records.Container; 074import org.apache.hadoop.yarn.api.records.ContainerExitStatus; 075import org.apache.hadoop.yarn.api.records.ContainerId; 076import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 077import org.apache.hadoop.yarn.api.records.ContainerState; 078import org.apache.hadoop.yarn.api.records.ContainerStatus; 079import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 080import org.apache.hadoop.yarn.api.records.LocalResource; 081import org.apache.hadoop.yarn.api.records.LocalResourceType; 082import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 083import org.apache.hadoop.yarn.api.records.NodeReport; 084import org.apache.hadoop.yarn.api.records.Priority; 085import org.apache.hadoop.yarn.api.records.Resource; 086import org.apache.hadoop.yarn.api.records.ResourceRequest; 087import org.apache.hadoop.yarn.api.records.URL; 088import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; 089import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; 090import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; 091import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 092import org.apache.hadoop.yarn.client.api.TimelineClient; 093import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; 094import org.apache.hadoop.yarn.client.api.async.NMClientAsync; 095import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; 096import org.apache.hadoop.yarn.conf.YarnConfiguration; 097import org.apache.hadoop.yarn.exceptions.YarnException; 098import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 099import org.apache.hadoop.yarn.util.ConverterUtils; 100import org.apache.log4j.LogManager; 101 102import com.google.common.annotations.VisibleForTesting; 103 104/** 105 * An ApplicationMaster for executing shell commands on a set of launched 106 * containers using the YARN framework. 107 * 108 * <p> 109 * This class is meant to act as an example on how to write yarn-based 110 * application masters. 111 * </p> 112 * 113 * <p> 114 * The ApplicationMaster is started on a container by the 115 * <code>ResourceManager</code>'s launcher. The first thing that the 116 * <code>ApplicationMaster</code> needs to do is to connect and register itself 117 * with the <code>ResourceManager</code>. The registration sets up information 118 * within the <code>ResourceManager</code> regarding what host:port the 119 * ApplicationMaster is listening on to provide any form of functionality to a 120 * client as well as a tracking url that a client can use to keep track of 121 * status/job history if needed. However, in the distributedshell, trackingurl 122 * and appMasterHost:appMasterRpcPort are not supported. 123 * </p> 124 * 125 * <p> 126 * The <code>ApplicationMaster</code> needs to send a heartbeat to the 127 * <code>ResourceManager</code> at regular intervals to inform the 128 * <code>ResourceManager</code> that it is up and alive. The 129 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the 130 * <code>ApplicationMaster</code> acts as a heartbeat. 131 * 132 * <p> 133 * For the actual handling of the job, the <code>ApplicationMaster</code> has to 134 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the 135 * required no. of containers using {@link ResourceRequest} with the necessary 136 * resource specifications such as node location, computational 137 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code> 138 * responds with an {@link AllocateResponse} that informs the 139 * <code>ApplicationMaster</code> of the set of newly allocated containers, 140 * completed containers as well as current state of available resources. 141 * </p> 142 * 143 * <p> 144 * For each allocated container, the <code>ApplicationMaster</code> can then set 145 * up the necessary launch context via {@link ContainerLaunchContext} to specify 146 * the allocated container id, local resources required by the executable, the 147 * environment to be setup for the executable, commands to execute, etc. and 148 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to 149 * launch and execute the defined commands on the given allocated container. 150 * </p> 151 * 152 * <p> 153 * The <code>ApplicationMaster</code> can monitor the launched container by 154 * either querying the <code>ResourceManager</code> using 155 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via 156 * the {@link ContainerManagementProtocol} by querying for the status of the allocated 157 * container's {@link ContainerId}. 158 * 159 * <p> 160 * After the job has been completed, the <code>ApplicationMaster</code> has to 161 * send a {@link FinishApplicationMasterRequest} to the 162 * <code>ResourceManager</code> to inform it that the 163 * <code>ApplicationMaster</code> has been completed. 164 */ 165@InterfaceAudience.Public 166@InterfaceStability.Unstable 167public class ApplicationMaster { 168 169 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); 170 171 @VisibleForTesting 172 @Private 173 public static enum DSEvent { 174 DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END 175 } 176 177 @VisibleForTesting 178 @Private 179 public static enum DSEntity { 180 DS_APP_ATTEMPT, DS_CONTAINER 181 } 182 183 // Configuration 184 private Configuration conf; 185 186 // Handle to communicate with the Resource Manager 187 @SuppressWarnings("rawtypes") 188 private AMRMClientAsync amRMClient; 189 190 // In both secure and non-secure modes, this points to the job-submitter. 191 @VisibleForTesting 192 UserGroupInformation appSubmitterUgi; 193 194 // Handle to communicate with the Node Manager 195 private NMClientAsync nmClientAsync; 196 // Listen to process the response from the Node Manager 197 private NMCallbackHandler containerListener; 198 199 // Application Attempt Id ( combination of attemptId and fail count ) 200 @VisibleForTesting 201 protected ApplicationAttemptId appAttemptID; 202 203 // TODO 204 // For status update for clients - yet to be implemented 205 // Hostname of the container 206 private String appMasterHostname = ""; 207 // Port on which the app master listens for status updates from clients 208 private int appMasterRpcPort = -1; 209 // Tracking url to which app master publishes info for clients to monitor 210 private String appMasterTrackingUrl = ""; 211 212 // App Master configuration 213 // No. of containers to run shell command on 214 @VisibleForTesting 215 protected int numTotalContainers = 1; 216 // Memory to request for the container on which the shell command will run 217 private int containerMemory = 10; 218 // VirtualCores to request for the container on which the shell command will run 219 private int containerVirtualCores = 1; 220 // Priority of the request 221 private int requestPriority; 222 223 // Counter for completed containers ( complete denotes successful or failed ) 224 private AtomicInteger numCompletedContainers = new AtomicInteger(); 225 // Allocated container count so that we know how many containers has the RM 226 // allocated to us 227 @VisibleForTesting 228 protected AtomicInteger numAllocatedContainers = new AtomicInteger(); 229 // Count of failed containers 230 private AtomicInteger numFailedContainers = new AtomicInteger(); 231 // Count of containers already requested from the RM 232 // Needed as once requested, we should not request for containers again. 233 // Only request for more if the original requirement changes. 234 @VisibleForTesting 235 protected AtomicInteger numRequestedContainers = new AtomicInteger(); 236 237 // Shell command to be executed 238 private String shellCommand = ""; 239 // Args to be passed to the shell command 240 private String shellArgs = ""; 241 // Env variables to be setup for the shell command 242 private Map<String, String> shellEnv = new HashMap<String, String>(); 243 244 // Location of shell script ( obtained from info set in env ) 245 // Shell script path in fs 246 private String scriptPath = ""; 247 // Timestamp needed for creating a local resource 248 private long shellScriptPathTimestamp = 0; 249 // File length needed for local resource 250 private long shellScriptPathLen = 0; 251 252 // Timeline domain ID 253 private String domainId = null; 254 255 // Hardcoded path to shell script in launch container's local env 256 private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh"; 257 private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH 258 + ".bat"; 259 260 // Hardcoded path to custom log_properties 261 private static final String log4jPath = "log4j.properties"; 262 263 private static final String shellCommandPath = "shellCommands"; 264 private static final String shellArgsPath = "shellArgs"; 265 266 private volatile boolean done; 267 268 private ByteBuffer allTokens; 269 270 // Launch threads 271 private List<Thread> launchThreads = new ArrayList<Thread>(); 272 273 // Timeline Client 274 @VisibleForTesting 275 TimelineClient timelineClient; 276 277 private final String linux_bash_command = "bash"; 278 private final String windows_command = "cmd /c"; 279 280 /** 281 * @param args Command line args 282 */ 283 public static void main(String[] args) { 284 boolean result = false; 285 try { 286 ApplicationMaster appMaster = new ApplicationMaster(); 287 LOG.info("Initializing ApplicationMaster"); 288 boolean doRun = appMaster.init(args); 289 if (!doRun) { 290 System.exit(0); 291 } 292 appMaster.run(); 293 result = appMaster.finish(); 294 } catch (Throwable t) { 295 LOG.fatal("Error running ApplicationMaster", t); 296 LogManager.shutdown(); 297 ExitUtil.terminate(1, t); 298 } 299 if (result) { 300 LOG.info("Application Master completed successfully. exiting"); 301 System.exit(0); 302 } else { 303 LOG.info("Application Master failed. exiting"); 304 System.exit(2); 305 } 306 } 307 308 /** 309 * Dump out contents of $CWD and the environment to stdout for debugging 310 */ 311 private void dumpOutDebugInfo() { 312 313 LOG.info("Dump debug output"); 314 Map<String, String> envs = System.getenv(); 315 for (Map.Entry<String, String> env : envs.entrySet()) { 316 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); 317 System.out.println("System env: key=" + env.getKey() + ", val=" 318 + env.getValue()); 319 } 320 321 BufferedReader buf = null; 322 try { 323 String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : 324 Shell.execCommand("ls", "-al"); 325 buf = new BufferedReader(new StringReader(lines)); 326 String line = ""; 327 while ((line = buf.readLine()) != null) { 328 LOG.info("System CWD content: " + line); 329 System.out.println("System CWD content: " + line); 330 } 331 } catch (IOException e) { 332 e.printStackTrace(); 333 } finally { 334 IOUtils.cleanup(LOG, buf); 335 } 336 } 337 338 public ApplicationMaster() { 339 // Set up the configuration 340 conf = new YarnConfiguration(); 341 } 342 343 /** 344 * Parse command line options 345 * 346 * @param args Command line args 347 * @return Whether init successful and run should be invoked 348 * @throws ParseException 349 * @throws IOException 350 */ 351 public boolean init(String[] args) throws ParseException, IOException { 352 Options opts = new Options(); 353 opts.addOption("app_attempt_id", true, 354 "App Attempt ID. Not to be used unless for testing purposes"); 355 opts.addOption("shell_env", true, 356 "Environment for shell script. Specified as env_key=env_val pairs"); 357 opts.addOption("container_memory", true, 358 "Amount of memory in MB to be requested to run the shell command"); 359 opts.addOption("container_vcores", true, 360 "Amount of virtual cores to be requested to run the shell command"); 361 opts.addOption("num_containers", true, 362 "No. of containers on which the shell command needs to be executed"); 363 opts.addOption("priority", true, "Application Priority. Default 0"); 364 opts.addOption("debug", false, "Dump out debug information"); 365 366 opts.addOption("help", false, "Print usage"); 367 CommandLine cliParser = new GnuParser().parse(opts, args); 368 369 if (args.length == 0) { 370 printUsage(opts); 371 throw new IllegalArgumentException( 372 "No args specified for application master to initialize"); 373 } 374 375 //Check whether customer log4j.properties file exists 376 if (fileExist(log4jPath)) { 377 try { 378 Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, 379 log4jPath); 380 } catch (Exception e) { 381 LOG.warn("Can not set up custom log4j properties. " + e); 382 } 383 } 384 385 if (cliParser.hasOption("help")) { 386 printUsage(opts); 387 return false; 388 } 389 390 if (cliParser.hasOption("debug")) { 391 dumpOutDebugInfo(); 392 } 393 394 Map<String, String> envs = System.getenv(); 395 396 if (!envs.containsKey(Environment.CONTAINER_ID.name())) { 397 if (cliParser.hasOption("app_attempt_id")) { 398 String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); 399 appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); 400 } else { 401 throw new IllegalArgumentException( 402 "Application Attempt Id not set in the environment"); 403 } 404 } else { 405 ContainerId containerId = ConverterUtils.toContainerId(envs 406 .get(Environment.CONTAINER_ID.name())); 407 appAttemptID = containerId.getApplicationAttemptId(); 408 } 409 410 if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { 411 throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV 412 + " not set in the environment"); 413 } 414 if (!envs.containsKey(Environment.NM_HOST.name())) { 415 throw new RuntimeException(Environment.NM_HOST.name() 416 + " not set in the environment"); 417 } 418 if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) { 419 throw new RuntimeException(Environment.NM_HTTP_PORT 420 + " not set in the environment"); 421 } 422 if (!envs.containsKey(Environment.NM_PORT.name())) { 423 throw new RuntimeException(Environment.NM_PORT.name() 424 + " not set in the environment"); 425 } 426 427 LOG.info("Application master for app" + ", appId=" 428 + appAttemptID.getApplicationId().getId() + ", clustertimestamp=" 429 + appAttemptID.getApplicationId().getClusterTimestamp() 430 + ", attemptId=" + appAttemptID.getAttemptId()); 431 432 if (!fileExist(shellCommandPath) 433 && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) { 434 throw new IllegalArgumentException( 435 "No shell command or shell script specified to be executed by application master"); 436 } 437 438 if (fileExist(shellCommandPath)) { 439 shellCommand = readContent(shellCommandPath); 440 } 441 442 if (fileExist(shellArgsPath)) { 443 shellArgs = readContent(shellArgsPath); 444 } 445 446 if (cliParser.hasOption("shell_env")) { 447 String shellEnvs[] = cliParser.getOptionValues("shell_env"); 448 for (String env : shellEnvs) { 449 env = env.trim(); 450 int index = env.indexOf('='); 451 if (index == -1) { 452 shellEnv.put(env, ""); 453 continue; 454 } 455 String key = env.substring(0, index); 456 String val = ""; 457 if (index < (env.length() - 1)) { 458 val = env.substring(index + 1); 459 } 460 shellEnv.put(key, val); 461 } 462 } 463 464 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { 465 scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); 466 467 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { 468 shellScriptPathTimestamp = Long.parseLong(envs 469 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); 470 } 471 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { 472 shellScriptPathLen = Long.parseLong(envs 473 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); 474 } 475 if (!scriptPath.isEmpty() 476 && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { 477 LOG.error("Illegal values in env for shell script path" + ", path=" 478 + scriptPath + ", len=" + shellScriptPathLen + ", timestamp=" 479 + shellScriptPathTimestamp); 480 throw new IllegalArgumentException( 481 "Illegal values in env for shell script path"); 482 } 483 } 484 485 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { 486 domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); 487 } 488 489 containerMemory = Integer.parseInt(cliParser.getOptionValue( 490 "container_memory", "10")); 491 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( 492 "container_vcores", "1")); 493 numTotalContainers = Integer.parseInt(cliParser.getOptionValue( 494 "num_containers", "1")); 495 if (numTotalContainers == 0) { 496 throw new IllegalArgumentException( 497 "Cannot run distributed shell with no containers"); 498 } 499 requestPriority = Integer.parseInt(cliParser 500 .getOptionValue("priority", "0")); 501 return true; 502 } 503 504 /** 505 * Helper function to print usage 506 * 507 * @param opts Parsed command line options 508 */ 509 private void printUsage(Options opts) { 510 new HelpFormatter().printHelp("ApplicationMaster", opts); 511 } 512 513 /** 514 * Main run function for the application master 515 * 516 * @throws YarnException 517 * @throws IOException 518 */ 519 @SuppressWarnings({ "unchecked" }) 520 public void run() throws YarnException, IOException, InterruptedException { 521 LOG.info("Starting ApplicationMaster"); 522 523 // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class 524 // are marked as LimitedPrivate 525 Credentials credentials = 526 UserGroupInformation.getCurrentUser().getCredentials(); 527 DataOutputBuffer dob = new DataOutputBuffer(); 528 credentials.writeTokenStorageToStream(dob); 529 // Now remove the AM->RM token so that containers cannot access it. 530 Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); 531 LOG.info("Executing with tokens:"); 532 while (iter.hasNext()) { 533 Token<?> token = iter.next(); 534 LOG.info(token); 535 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { 536 iter.remove(); 537 } 538 } 539 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 540 541 // Create appSubmitterUgi and add original tokens to it 542 String appSubmitterUserName = 543 System.getenv(ApplicationConstants.Environment.USER.name()); 544 appSubmitterUgi = 545 UserGroupInformation.createRemoteUser(appSubmitterUserName); 546 appSubmitterUgi.addCredentials(credentials); 547 548 549 AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); 550 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); 551 amRMClient.init(conf); 552 amRMClient.start(); 553 554 containerListener = createNMCallbackHandler(); 555 nmClientAsync = new NMClientAsyncImpl(containerListener); 556 nmClientAsync.init(conf); 557 nmClientAsync.start(); 558 559 startTimelineClient(conf); 560 if(timelineClient != null) { 561 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 562 DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); 563 } 564 565 // Setup local RPC Server to accept status requests directly from clients 566 // TODO need to setup a protocol for client to be able to communicate to 567 // the RPC server 568 // TODO use the rpc port info to register with the RM for the client to 569 // send requests to this app master 570 571 // Register self with ResourceManager 572 // This will start heartbeating to the RM 573 appMasterHostname = NetUtils.getHostname(); 574 RegisterApplicationMasterResponse response = amRMClient 575 .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 576 appMasterTrackingUrl); 577 // Dump out information about cluster capability as seen by the 578 // resource manager 579 int maxMem = response.getMaximumResourceCapability().getMemory(); 580 LOG.info("Max mem capabililty of resources in this cluster " + maxMem); 581 582 int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); 583 LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores); 584 585 // A resource ask cannot exceed the max. 586 if (containerMemory > maxMem) { 587 LOG.info("Container memory specified above max threshold of cluster." 588 + " Using max value." + ", specified=" + containerMemory + ", max=" 589 + maxMem); 590 containerMemory = maxMem; 591 } 592 593 if (containerVirtualCores > maxVCores) { 594 LOG.info("Container virtual cores specified above max threshold of cluster." 595 + " Using max value." + ", specified=" + containerVirtualCores + ", max=" 596 + maxVCores); 597 containerVirtualCores = maxVCores; 598 } 599 600 List<Container> previousAMRunningContainers = 601 response.getContainersFromPreviousAttempts(); 602 LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() 603 + " previous attempts' running containers on AM registration."); 604 numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); 605 606 int numTotalContainersToRequest = 607 numTotalContainers - previousAMRunningContainers.size(); 608 // Setup ask for containers from RM 609 // Send request for containers to RM 610 // Until we get our fully allocated quota, we keep on polling RM for 611 // containers 612 // Keep looping until all the containers are launched and shell script 613 // executed on them ( regardless of success/failure). 614 for (int i = 0; i < numTotalContainersToRequest; ++i) { 615 ContainerRequest containerAsk = setupContainerAskForRM(); 616 amRMClient.addContainerRequest(containerAsk); 617 } 618 numRequestedContainers.set(numTotalContainers); 619 } 620 621 @VisibleForTesting 622 void startTimelineClient(final Configuration conf) 623 throws YarnException, IOException, InterruptedException { 624 try { 625 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 626 @Override 627 public Void run() throws Exception { 628 if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, 629 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { 630 // Creating the Timeline Client 631 timelineClient = TimelineClient.createTimelineClient(); 632 timelineClient.init(conf); 633 timelineClient.start(); 634 } else { 635 timelineClient = null; 636 LOG.warn("Timeline service is not enabled"); 637 } 638 return null; 639 } 640 }); 641 } catch (UndeclaredThrowableException e) { 642 throw new YarnException(e.getCause()); 643 } 644 } 645 646 @VisibleForTesting 647 NMCallbackHandler createNMCallbackHandler() { 648 return new NMCallbackHandler(this); 649 } 650 651 @VisibleForTesting 652 protected boolean finish() { 653 // wait for completion. 654 while (!done 655 && (numCompletedContainers.get() != numTotalContainers)) { 656 try { 657 Thread.sleep(200); 658 } catch (InterruptedException ex) {} 659 } 660 661 if(timelineClient != null) { 662 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 663 DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); 664 } 665 666 // Join all launched threads 667 // needed for when we time out 668 // and we need to release containers 669 for (Thread launchThread : launchThreads) { 670 try { 671 launchThread.join(10000); 672 } catch (InterruptedException e) { 673 LOG.info("Exception thrown in thread join: " + e.getMessage()); 674 e.printStackTrace(); 675 } 676 } 677 678 // When the application completes, it should stop all running containers 679 LOG.info("Application completed. Stopping running containers"); 680 nmClientAsync.stop(); 681 682 // When the application completes, it should send a finish application 683 // signal to the RM 684 LOG.info("Application completed. Signalling finish to RM"); 685 686 FinalApplicationStatus appStatus; 687 String appMessage = null; 688 boolean success = true; 689 if (numFailedContainers.get() == 0 && 690 numCompletedContainers.get() == numTotalContainers) { 691 appStatus = FinalApplicationStatus.SUCCEEDED; 692 } else { 693 appStatus = FinalApplicationStatus.FAILED; 694 appMessage = "Diagnostics." + ", total=" + numTotalContainers 695 + ", completed=" + numCompletedContainers.get() + ", allocated=" 696 + numAllocatedContainers.get() + ", failed=" 697 + numFailedContainers.get(); 698 LOG.info(appMessage); 699 success = false; 700 } 701 try { 702 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); 703 } catch (YarnException ex) { 704 LOG.error("Failed to unregister application", ex); 705 } catch (IOException e) { 706 LOG.error("Failed to unregister application", e); 707 } 708 709 amRMClient.stop(); 710 711 // Stop Timeline Client 712 if(timelineClient != null) { 713 timelineClient.stop(); 714 } 715 716 return success; 717 } 718 719 private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { 720 @SuppressWarnings("unchecked") 721 @Override 722 public void onContainersCompleted(List<ContainerStatus> completedContainers) { 723 LOG.info("Got response from RM for container ask, completedCnt=" 724 + completedContainers.size()); 725 for (ContainerStatus containerStatus : completedContainers) { 726 LOG.info(appAttemptID + " got container status for containerID=" 727 + containerStatus.getContainerId() + ", state=" 728 + containerStatus.getState() + ", exitStatus=" 729 + containerStatus.getExitStatus() + ", diagnostics=" 730 + containerStatus.getDiagnostics()); 731 732 // non complete containers should not be here 733 assert (containerStatus.getState() == ContainerState.COMPLETE); 734 735 // increment counters for completed/failed containers 736 int exitStatus = containerStatus.getExitStatus(); 737 if (0 != exitStatus) { 738 // container failed 739 if (ContainerExitStatus.ABORTED != exitStatus) { 740 // shell script failed 741 // counts as completed 742 numCompletedContainers.incrementAndGet(); 743 numFailedContainers.incrementAndGet(); 744 } else { 745 // container was killed by framework, possibly preempted 746 // we should re-try as the container was lost for some reason 747 numAllocatedContainers.decrementAndGet(); 748 numRequestedContainers.decrementAndGet(); 749 // we do not need to release the container as it would be done 750 // by the RM 751 } 752 } else { 753 // nothing to do 754 // container completed successfully 755 numCompletedContainers.incrementAndGet(); 756 LOG.info("Container completed successfully." + ", containerId=" 757 + containerStatus.getContainerId()); 758 } 759 if(timelineClient != null) { 760 publishContainerEndEvent( 761 timelineClient, containerStatus, domainId, appSubmitterUgi); 762 } 763 } 764 765 // ask for more containers if any failed 766 int askCount = numTotalContainers - numRequestedContainers.get(); 767 numRequestedContainers.addAndGet(askCount); 768 769 if (askCount > 0) { 770 for (int i = 0; i < askCount; ++i) { 771 ContainerRequest containerAsk = setupContainerAskForRM(); 772 amRMClient.addContainerRequest(containerAsk); 773 } 774 } 775 776 if (numCompletedContainers.get() == numTotalContainers) { 777 done = true; 778 } 779 } 780 781 @Override 782 public void onContainersAllocated(List<Container> allocatedContainers) { 783 LOG.info("Got response from RM for container ask, allocatedCnt=" 784 + allocatedContainers.size()); 785 numAllocatedContainers.addAndGet(allocatedContainers.size()); 786 for (Container allocatedContainer : allocatedContainers) { 787 LOG.info("Launching shell command on a new container." 788 + ", containerId=" + allocatedContainer.getId() 789 + ", containerNode=" + allocatedContainer.getNodeId().getHost() 790 + ":" + allocatedContainer.getNodeId().getPort() 791 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() 792 + ", containerResourceMemory" 793 + allocatedContainer.getResource().getMemory() 794 + ", containerResourceVirtualCores" 795 + allocatedContainer.getResource().getVirtualCores()); 796 // + ", containerToken" 797 // +allocatedContainer.getContainerToken().getIdentifier().toString()); 798 799 LaunchContainerRunnable runnableLaunchContainer = 800 new LaunchContainerRunnable(allocatedContainer, containerListener); 801 Thread launchThread = new Thread(runnableLaunchContainer); 802 803 // launch and start the container on a separate thread to keep 804 // the main thread unblocked 805 // as all containers may not be allocated at one go. 806 launchThreads.add(launchThread); 807 launchThread.start(); 808 } 809 } 810 811 @Override 812 public void onShutdownRequest() { 813 done = true; 814 } 815 816 @Override 817 public void onNodesUpdated(List<NodeReport> updatedNodes) {} 818 819 @Override 820 public float getProgress() { 821 // set progress to deliver to RM on next heartbeat 822 float progress = (float) numCompletedContainers.get() 823 / numTotalContainers; 824 return progress; 825 } 826 827 @Override 828 public void onError(Throwable e) { 829 done = true; 830 amRMClient.stop(); 831 } 832 } 833 834 @VisibleForTesting 835 static class NMCallbackHandler 836 implements NMClientAsync.CallbackHandler { 837 838 private ConcurrentMap<ContainerId, Container> containers = 839 new ConcurrentHashMap<ContainerId, Container>(); 840 private final ApplicationMaster applicationMaster; 841 842 public NMCallbackHandler(ApplicationMaster applicationMaster) { 843 this.applicationMaster = applicationMaster; 844 } 845 846 public void addContainer(ContainerId containerId, Container container) { 847 containers.putIfAbsent(containerId, container); 848 } 849 850 @Override 851 public void onContainerStopped(ContainerId containerId) { 852 if (LOG.isDebugEnabled()) { 853 LOG.debug("Succeeded to stop Container " + containerId); 854 } 855 containers.remove(containerId); 856 } 857 858 @Override 859 public void onContainerStatusReceived(ContainerId containerId, 860 ContainerStatus containerStatus) { 861 if (LOG.isDebugEnabled()) { 862 LOG.debug("Container Status: id=" + containerId + ", status=" + 863 containerStatus); 864 } 865 } 866 867 @Override 868 public void onContainerStarted(ContainerId containerId, 869 Map<String, ByteBuffer> allServiceResponse) { 870 if (LOG.isDebugEnabled()) { 871 LOG.debug("Succeeded to start Container " + containerId); 872 } 873 Container container = containers.get(containerId); 874 if (container != null) { 875 applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); 876 } 877 if(applicationMaster.timelineClient != null) { 878 ApplicationMaster.publishContainerStartEvent( 879 applicationMaster.timelineClient, container, 880 applicationMaster.domainId, applicationMaster.appSubmitterUgi); 881 } 882 } 883 884 @Override 885 public void onStartContainerError(ContainerId containerId, Throwable t) { 886 LOG.error("Failed to start Container " + containerId); 887 containers.remove(containerId); 888 applicationMaster.numCompletedContainers.incrementAndGet(); 889 applicationMaster.numFailedContainers.incrementAndGet(); 890 } 891 892 @Override 893 public void onGetContainerStatusError( 894 ContainerId containerId, Throwable t) { 895 LOG.error("Failed to query the status of Container " + containerId); 896 } 897 898 @Override 899 public void onStopContainerError(ContainerId containerId, Throwable t) { 900 LOG.error("Failed to stop Container " + containerId); 901 containers.remove(containerId); 902 } 903 } 904 905 /** 906 * Thread to connect to the {@link ContainerManagementProtocol} and launch the container 907 * that will execute the shell command. 908 */ 909 private class LaunchContainerRunnable implements Runnable { 910 911 // Allocated container 912 Container container; 913 914 NMCallbackHandler containerListener; 915 916 /** 917 * @param lcontainer Allocated container 918 * @param containerListener Callback handler of the container 919 */ 920 public LaunchContainerRunnable( 921 Container lcontainer, NMCallbackHandler containerListener) { 922 this.container = lcontainer; 923 this.containerListener = containerListener; 924 } 925 926 @Override 927 /** 928 * Connects to CM, sets up container launch context 929 * for shell command and eventually dispatches the container 930 * start request to the CM. 931 */ 932 public void run() { 933 LOG.info("Setting up container launch container for containerid=" 934 + container.getId()); 935 936 // Set the local resources 937 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 938 939 // The container for the eventual shell commands needs its own local 940 // resources too. 941 // In this scenario, if a shell script is specified, we need to have it 942 // copied and made available to the container. 943 if (!scriptPath.isEmpty()) { 944 Path renamedScriptPath = null; 945 if (Shell.WINDOWS) { 946 renamedScriptPath = new Path(scriptPath + ".bat"); 947 } else { 948 renamedScriptPath = new Path(scriptPath + ".sh"); 949 } 950 951 try { 952 // rename the script file based on the underlying OS syntax. 953 renameScriptFile(renamedScriptPath); 954 } catch (Exception e) { 955 LOG.error( 956 "Not able to add suffix (.bat/.sh) to the shell script filename", 957 e); 958 // We know we cannot continue launching the container 959 // so we should release it. 960 numCompletedContainers.incrementAndGet(); 961 numFailedContainers.incrementAndGet(); 962 return; 963 } 964 965 URL yarnUrl = null; 966 try { 967 yarnUrl = ConverterUtils.getYarnUrlFromURI( 968 new URI(renamedScriptPath.toString())); 969 } catch (URISyntaxException e) { 970 LOG.error("Error when trying to use shell script path specified" 971 + " in env, path=" + renamedScriptPath, e); 972 // A failure scenario on bad input such as invalid shell script path 973 // We know we cannot continue launching the container 974 // so we should release it. 975 // TODO 976 numCompletedContainers.incrementAndGet(); 977 numFailedContainers.incrementAndGet(); 978 return; 979 } 980 LocalResource shellRsrc = LocalResource.newInstance(yarnUrl, 981 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 982 shellScriptPathLen, shellScriptPathTimestamp); 983 localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath : 984 ExecShellStringPath, shellRsrc); 985 shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; 986 } 987 988 // Set the necessary command to execute on the allocated container 989 Vector<CharSequence> vargs = new Vector<CharSequence>(5); 990 991 // Set executable command 992 vargs.add(shellCommand); 993 // Set shell script path 994 if (!scriptPath.isEmpty()) { 995 vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath 996 : ExecShellStringPath); 997 } 998 999 // Set args for the shell command if any 1000 vargs.add(shellArgs); 1001 // Add log redirect params 1002 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); 1003 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); 1004 1005 // Get final commmand 1006 StringBuilder command = new StringBuilder(); 1007 for (CharSequence str : vargs) { 1008 command.append(str).append(" "); 1009 } 1010 1011 List<String> commands = new ArrayList<String>(); 1012 commands.add(command.toString()); 1013 1014 // Set up ContainerLaunchContext, setting local resource, environment, 1015 // command and token for constructor. 1016 1017 // Note for tokens: Set up tokens for the container too. Today, for normal 1018 // shell commands, the container in distribute-shell doesn't need any 1019 // tokens. We are populating them mainly for NodeManagers to be able to 1020 // download anyfiles in the distributed file-system. The tokens are 1021 // otherwise also useful in cases, for e.g., when one is running a 1022 // "hadoop dfs" command inside the distributed shell. 1023 ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( 1024 localResources, shellEnv, commands, null, allTokens.duplicate(), null); 1025 containerListener.addContainer(container.getId(), container); 1026 nmClientAsync.startContainerAsync(container, ctx); 1027 } 1028 } 1029 1030 private void renameScriptFile(final Path renamedScriptPath) 1031 throws IOException, InterruptedException { 1032 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 1033 @Override 1034 public Void run() throws IOException { 1035 FileSystem fs = renamedScriptPath.getFileSystem(conf); 1036 fs.rename(new Path(scriptPath), renamedScriptPath); 1037 return null; 1038 } 1039 }); 1040 LOG.info("User " + appSubmitterUgi.getUserName() 1041 + " added suffix(.sh/.bat) to script file as " + renamedScriptPath); 1042 } 1043 1044 /** 1045 * Setup the request that will be sent to the RM for the container ask. 1046 * 1047 * @return the setup ResourceRequest to be sent to RM 1048 */ 1049 private ContainerRequest setupContainerAskForRM() { 1050 // setup requirements for hosts 1051 // using * as any host will do for the distributed shell app 1052 // set the priority for the request 1053 // TODO - what is the range for priority? how to decide? 1054 Priority pri = Priority.newInstance(requestPriority); 1055 1056 // Set up resource type requirements 1057 // For now, memory and CPU are supported so we set memory and cpu requirements 1058 Resource capability = Resource.newInstance(containerMemory, 1059 containerVirtualCores); 1060 1061 ContainerRequest request = new ContainerRequest(capability, null, null, 1062 pri); 1063 LOG.info("Requested container ask: " + request.toString()); 1064 return request; 1065 } 1066 1067 private boolean fileExist(String filePath) { 1068 return new File(filePath).exists(); 1069 } 1070 1071 private String readContent(String filePath) throws IOException { 1072 DataInputStream ds = null; 1073 try { 1074 ds = new DataInputStream(new FileInputStream(filePath)); 1075 return ds.readUTF(); 1076 } finally { 1077 org.apache.commons.io.IOUtils.closeQuietly(ds); 1078 } 1079 } 1080 1081 private static void publishContainerStartEvent( 1082 final TimelineClient timelineClient, Container container, String domainId, 1083 UserGroupInformation ugi) { 1084 final TimelineEntity entity = new TimelineEntity(); 1085 entity.setEntityId(container.getId().toString()); 1086 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1087 entity.setDomainId(domainId); 1088 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1089 TimelineEvent event = new TimelineEvent(); 1090 event.setTimestamp(System.currentTimeMillis()); 1091 event.setEventType(DSEvent.DS_CONTAINER_START.toString()); 1092 event.addEventInfo("Node", container.getNodeId().toString()); 1093 event.addEventInfo("Resources", container.getResource().toString()); 1094 entity.addEvent(event); 1095 1096 try { 1097 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { 1098 @Override 1099 public TimelinePutResponse run() throws Exception { 1100 return timelineClient.putEntities(entity); 1101 } 1102 }); 1103 } catch (Exception e) { 1104 LOG.error("Container start event could not be published for " 1105 + container.getId().toString(), 1106 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1107 } 1108 } 1109 1110 private static void publishContainerEndEvent( 1111 final TimelineClient timelineClient, ContainerStatus container, 1112 String domainId, UserGroupInformation ugi) { 1113 final TimelineEntity entity = new TimelineEntity(); 1114 entity.setEntityId(container.getContainerId().toString()); 1115 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1116 entity.setDomainId(domainId); 1117 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1118 TimelineEvent event = new TimelineEvent(); 1119 event.setTimestamp(System.currentTimeMillis()); 1120 event.setEventType(DSEvent.DS_CONTAINER_END.toString()); 1121 event.addEventInfo("State", container.getState().name()); 1122 event.addEventInfo("Exit Status", container.getExitStatus()); 1123 entity.addEvent(event); 1124 try { 1125 timelineClient.putEntities(entity); 1126 } catch (YarnException | IOException e) { 1127 LOG.error("Container end event could not be published for " 1128 + container.getContainerId().toString(), e); 1129 } 1130 } 1131 1132 private static void publishApplicationAttemptEvent( 1133 final TimelineClient timelineClient, String appAttemptId, 1134 DSEvent appEvent, String domainId, UserGroupInformation ugi) { 1135 final TimelineEntity entity = new TimelineEntity(); 1136 entity.setEntityId(appAttemptId); 1137 entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); 1138 entity.setDomainId(domainId); 1139 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1140 TimelineEvent event = new TimelineEvent(); 1141 event.setEventType(appEvent.toString()); 1142 event.setTimestamp(System.currentTimeMillis()); 1143 entity.addEvent(event); 1144 try { 1145 timelineClient.putEntities(entity); 1146 } catch (YarnException | IOException e) { 1147 LOG.error("App Attempt " 1148 + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") 1149 + " event could not be published for " 1150 + appAttemptId.toString(), e); 1151 } 1152 } 1153}