001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.applications.distributedshell; 020 021import java.io.BufferedReader; 022import java.io.IOException; 023import java.io.InputStreamReader; 024import java.net.InetSocketAddress; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.ArrayList; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Vector; 032import java.util.concurrent.CopyOnWriteArrayList; 033import java.util.concurrent.atomic.AtomicInteger; 034 035import org.apache.commons.cli.CommandLine; 036import org.apache.commons.cli.GnuParser; 037import org.apache.commons.cli.HelpFormatter; 038import org.apache.commons.cli.Options; 039import org.apache.commons.cli.ParseException; 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042 043import org.apache.hadoop.classification.InterfaceAudience; 044import org.apache.hadoop.classification.InterfaceStability; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.net.NetUtils; 047import org.apache.hadoop.security.UserGroupInformation; 048import org.apache.hadoop.yarn.api.AMRMProtocol; 049import org.apache.hadoop.yarn.api.ApplicationConstants; 050import org.apache.hadoop.yarn.api.ContainerManager; 051 052import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; 053import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 054import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; 055//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; 056//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; 057import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; 058import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 059import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 060 061import org.apache.hadoop.yarn.api.records.AMResponse; 062import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 063import org.apache.hadoop.yarn.api.records.Container; 064import org.apache.hadoop.yarn.api.records.ContainerId; 065import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 066import org.apache.hadoop.yarn.api.records.ContainerState; 067import org.apache.hadoop.yarn.api.records.ContainerStatus; 068import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 069import org.apache.hadoop.yarn.api.records.LocalResource; 070import org.apache.hadoop.yarn.api.records.LocalResourceType; 071import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 072import org.apache.hadoop.yarn.api.records.Priority; 073import org.apache.hadoop.yarn.api.records.Resource; 074import org.apache.hadoop.yarn.api.records.ResourceRequest; 075import org.apache.hadoop.yarn.conf.YarnConfiguration; 076import org.apache.hadoop.yarn.exceptions.YarnRemoteException; 077import org.apache.hadoop.yarn.ipc.YarnRPC; 078import org.apache.hadoop.yarn.util.ConverterUtils; 079import org.apache.hadoop.yarn.util.Records; 080 081/** 082 * An ApplicationMaster for executing shell commands on a set of launched containers using the YARN framework. 083 * 084 * <p>This class is meant to act as an example on how to write yarn-based application masters. </p> 085 * 086 * <p> The ApplicationMaster is started on a container by the <code>ResourceManager</code>'s launcher. 087 * The first thing that the <code>ApplicationMaster</code> needs to do is to connect and register itself with 088 * the <code>ResourceManager</code>. The registration sets up information within the <code>ResourceManager</code> 089 * regarding what host:port the ApplicationMaster is listening on to provide any form of functionality to a client 090 * as well as a tracking url that a client can use to keep track of status/job history if needed. </p> 091 * 092 * <p> The <code>ApplicationMaster</code> needs to send a heartbeat to the <code>ResourceManager</code> at regular intervals 093 * to inform the <code>ResourceManager</code> that it is up and alive. The {@link AMRMProtocol#allocate} to the 094 * <code>ResourceManager</code> from the <code>ApplicationMaster</code> acts as a heartbeat. 095 * 096 * <p> For the actual handling of the job, the <code>ApplicationMaster</code> has to request the 097 * <code>ResourceManager</code> via {@link AllocateRequest} for the required no. of containers using {@link ResourceRequest} 098 * with the necessary resource specifications such as node location, computational (memory/disk/cpu) resource requirements. 099 * The <code>ResourceManager</code> responds with an {@link AllocateResponse} that informs the <code>ApplicationMaster</code> 100 * of the set of newly allocated containers, completed containers as well as current state of available resources. </p> 101 * 102 * <p> For each allocated container, the <code>ApplicationMaster</code> can then set up the necessary launch context via 103 * {@link ContainerLaunchContext} to specify the allocated container id, local resources required by the executable, 104 * the environment to be setup for the executable, commands to execute, etc. and submit a {@link StartContainerRequest} 105 * to the {@link ContainerManager} to launch and execute the defined commands on the given allocated container. </p> 106 * 107 * <p> The <code>ApplicationMaster</code> can monitor the launched container by either querying the <code>ResourceManager</code> 108 * using {@link AMRMProtocol#allocate} to get updates on completed containers or via the {@link ContainerManager} 109 * by querying for the status of the allocated container's {@link ContainerId}. 110 * 111 * <p> After the job has been completed, the <code>ApplicationMaster</code> has to send a {@link FinishApplicationMasterRequest} 112 * to the <code>ResourceManager</code> to inform it that the <code>ApplicationMaster</code> has been completed. 113 */ 114@InterfaceAudience.Public 115@InterfaceStability.Unstable 116public class ApplicationMaster { 117 118 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); 119 120 // Configuration 121 private Configuration conf; 122 // YARN RPC to communicate with the Resource Manager or Node Manager 123 private YarnRPC rpc; 124 125 // Handle to communicate with the Resource Manager 126 private AMRMProtocol resourceManager; 127 128 // Application Attempt Id ( combination of attemptId and fail count ) 129 private ApplicationAttemptId appAttemptID; 130 131 // TODO 132 // For status update for clients - yet to be implemented 133 // Hostname of the container 134 private String appMasterHostname = ""; 135 // Port on which the app master listens for status update requests from clients 136 private int appMasterRpcPort = 0; 137 // Tracking url to which app master publishes info for clients to monitor 138 private String appMasterTrackingUrl = ""; 139 140 // App Master configuration 141 // No. of containers to run shell command on 142 private int numTotalContainers = 1; 143 // Memory to request for the container on which the shell command will run 144 private int containerMemory = 10; 145 // Priority of the request 146 private int requestPriority; 147 148 // Incremental counter for rpc calls to the RM 149 private AtomicInteger rmRequestID = new AtomicInteger(); 150 151 // Simple flag to denote whether all works is done 152 private boolean appDone = false; 153 // Counter for completed containers ( complete denotes successful or failed ) 154 private AtomicInteger numCompletedContainers = new AtomicInteger(); 155 // Allocated container count so that we know how many containers has the RM 156 // allocated to us 157 private AtomicInteger numAllocatedContainers = new AtomicInteger(); 158 // Count of failed containers 159 private AtomicInteger numFailedContainers = new AtomicInteger(); 160 // Count of containers already requested from the RM 161 // Needed as once requested, we should not request for containers again and again. 162 // Only request for more if the original requirement changes. 163 private AtomicInteger numRequestedContainers = new AtomicInteger(); 164 165 // Shell command to be executed 166 private String shellCommand = ""; 167 // Args to be passed to the shell command 168 private String shellArgs = ""; 169 // Env variables to be setup for the shell command 170 private Map<String, String> shellEnv = new HashMap<String, String>(); 171 172 // Location of shell script ( obtained from info set in env ) 173 // Shell script path in fs 174 private String shellScriptPath = ""; 175 // Timestamp needed for creating a local resource 176 private long shellScriptPathTimestamp = 0; 177 // File length needed for local resource 178 private long shellScriptPathLen = 0; 179 180 // Hardcoded path to shell script in launch container's local env 181 private final String ExecShellStringPath = "ExecShellScript.sh"; 182 183 // Containers to be released 184 private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>(); 185 186 // Launch threads 187 private List<Thread> launchThreads = new ArrayList<Thread>(); 188 189 /** 190 * @param args Command line args 191 */ 192 public static void main(String[] args) { 193 boolean result = false; 194 try { 195 ApplicationMaster appMaster = new ApplicationMaster(); 196 LOG.info("Initializing ApplicationMaster"); 197 boolean doRun = appMaster.init(args); 198 if (!doRun) { 199 System.exit(0); 200 } 201 result = appMaster.run(); 202 } catch (Throwable t) { 203 LOG.fatal("Error running ApplicationMaster", t); 204 System.exit(1); 205 } 206 if (result) { 207 LOG.info("Application Master completed successfully. exiting"); 208 System.exit(0); 209 } 210 else { 211 LOG.info("Application Master failed. exiting"); 212 System.exit(2); 213 } 214 } 215 216 /** 217 * Dump out contents of $CWD and the environment to stdout for debugging 218 */ 219 private void dumpOutDebugInfo() { 220 221 LOG.info("Dump debug output"); 222 Map<String, String> envs = System.getenv(); 223 for (Map.Entry<String, String> env : envs.entrySet()) { 224 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); 225 System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue()); 226 } 227 228 String cmd = "ls -al"; 229 Runtime run = Runtime.getRuntime(); 230 Process pr = null; 231 try { 232 pr = run.exec(cmd); 233 pr.waitFor(); 234 235 BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream())); 236 String line = ""; 237 while ((line=buf.readLine())!=null) { 238 LOG.info("System CWD content: " + line); 239 System.out.println("System CWD content: " + line); 240 } 241 buf.close(); 242 } catch (IOException e) { 243 e.printStackTrace(); 244 } catch (InterruptedException e) { 245 e.printStackTrace(); 246 } 247 } 248 249 public ApplicationMaster() throws Exception { 250 // Set up the configuration and RPC 251 conf = new Configuration(); 252 rpc = YarnRPC.create(conf); 253 } 254 /** 255 * Parse command line options 256 * @param args Command line args 257 * @return Whether init successful and run should be invoked 258 * @throws ParseException 259 * @throws IOException 260 */ 261 public boolean init(String[] args) throws ParseException, IOException { 262 263 Options opts = new Options(); 264 opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes"); 265 opts.addOption("shell_command", true, "Shell command to be executed by the Application Master"); 266 opts.addOption("shell_script", true, "Location of the shell script to be executed"); 267 opts.addOption("shell_args", true, "Command line args for the shell script"); 268 opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); 269 opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); 270 opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); 271 opts.addOption("priority", true, "Application Priority. Default 0"); 272 opts.addOption("debug", false, "Dump out debug information"); 273 274 opts.addOption("help", false, "Print usage"); 275 CommandLine cliParser = new GnuParser().parse(opts, args); 276 277 if (args.length == 0) { 278 printUsage(opts); 279 throw new IllegalArgumentException("No args specified for application master to initialize"); 280 } 281 282 if (cliParser.hasOption("help")) { 283 printUsage(opts); 284 return false; 285 } 286 287 if (cliParser.hasOption("debug")) { 288 dumpOutDebugInfo(); 289 } 290 291 Map<String, String> envs = System.getenv(); 292 293 appAttemptID = Records.newRecord(ApplicationAttemptId.class); 294 if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) { 295 if (cliParser.hasOption("app_attempt_id")) { 296 String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); 297 appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); 298 } 299 else { 300 throw new IllegalArgumentException("Application Attempt Id not set in the environment"); 301 } 302 } else { 303 ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)); 304 appAttemptID = containerId.getApplicationAttemptId(); 305 } 306 307 LOG.info("Application master for app" 308 + ", appId=" + appAttemptID.getApplicationId().getId() 309 + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() 310 + ", attemptId=" + appAttemptID.getAttemptId()); 311 312 if (!cliParser.hasOption("shell_command")) { 313 throw new IllegalArgumentException("No shell command specified to be executed by application master"); 314 } 315 shellCommand = cliParser.getOptionValue("shell_command"); 316 317 if (cliParser.hasOption("shell_args")) { 318 shellArgs = cliParser.getOptionValue("shell_args"); 319 } 320 if (cliParser.hasOption("shell_env")) { 321 String shellEnvs[] = cliParser.getOptionValues("shell_env"); 322 for (String env : shellEnvs) { 323 env = env.trim(); 324 int index = env.indexOf('='); 325 if (index == -1) { 326 shellEnv.put(env, ""); 327 continue; 328 } 329 String key = env.substring(0, index); 330 String val = ""; 331 if (index < (env.length()-1)) { 332 val = env.substring(index+1); 333 } 334 shellEnv.put(key, val); 335 } 336 } 337 338 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { 339 shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); 340 341 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { 342 shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); 343 } 344 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { 345 shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); 346 } 347 348 if (!shellScriptPath.isEmpty() 349 && (shellScriptPathTimestamp <= 0 350 || shellScriptPathLen <= 0)) { 351 LOG.error("Illegal values in env for shell script path" 352 + ", path=" + shellScriptPath 353 + ", len=" + shellScriptPathLen 354 + ", timestamp=" + shellScriptPathTimestamp); 355 throw new IllegalArgumentException("Illegal values in env for shell script path"); 356 } 357 } 358 359 containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); 360 numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); 361 requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); 362 363 return true; 364 } 365 366 /** 367 * Helper function to print usage 368 * @param opts Parsed command line options 369 */ 370 private void printUsage(Options opts) { 371 new HelpFormatter().printHelp("ApplicationMaster", opts); 372 } 373 374 /** 375 * Main run function for the application master 376 * @throws YarnRemoteException 377 */ 378 public boolean run() throws YarnRemoteException { 379 LOG.info("Starting ApplicationMaster"); 380 381 // Connect to ResourceManager 382 resourceManager = connectToRM(); 383 384 // Setup local RPC Server to accept status requests directly from clients 385 // TODO need to setup a protocol for client to be able to communicate to the RPC server 386 // TODO use the rpc port info to register with the RM for the client to send requests to this app master 387 388 // Register self with ResourceManager 389 RegisterApplicationMasterResponse response = registerToRM(); 390 // Dump out information about cluster capability as seen by the resource manager 391 int minMem = response.getMinimumResourceCapability().getMemory(); 392 int maxMem = response.getMaximumResourceCapability().getMemory(); 393 LOG.info("Min mem capabililty of resources in this cluster " + minMem); 394 LOG.info("Max mem capabililty of resources in this cluster " + maxMem); 395 396 // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be 397 // a multiple of the min value and cannot exceed the max. 398 // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min 399 if (containerMemory < minMem) { 400 LOG.info("Container memory specified below min threshold of cluster. Using min value." 401 + ", specified=" + containerMemory 402 + ", min=" + minMem); 403 containerMemory = minMem; 404 } 405 else if (containerMemory > maxMem) { 406 LOG.info("Container memory specified above max threshold of cluster. Using max value." 407 + ", specified=" + containerMemory 408 + ", max=" + maxMem); 409 containerMemory = maxMem; 410 } 411 412 // Setup heartbeat emitter 413 // TODO poll RM every now and then with an empty request to let RM know that we are alive 414 // The heartbeat interval after which an AM is timed out by the RM is defined by a config setting: 415 // RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS 416 // The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter 417 // is not required. 418 419 // Setup ask for containers from RM 420 // Send request for containers to RM 421 // Until we get our fully allocated quota, we keep on polling RM for containers 422 // Keep looping until all the containers are launched and shell script executed on them 423 // ( regardless of success/failure). 424 425 int loopCounter = -1; 426 427 while (numCompletedContainers.get() < numTotalContainers 428 && !appDone) { 429 loopCounter++; 430 431 // log current state 432 LOG.info("Current application state: loop=" + loopCounter 433 + ", appDone=" + appDone 434 + ", total=" + numTotalContainers 435 + ", requested=" + numRequestedContainers 436 + ", completed=" + numCompletedContainers 437 + ", failed=" + numFailedContainers 438 + ", currentAllocated=" + numAllocatedContainers); 439 440 // Sleep before each loop when asking RM for containers 441 // to avoid flooding RM with spurious requests when it 442 // need not have any available containers 443 // Sleeping for 1000 ms. 444 try { 445 Thread.sleep(1000); 446 } catch (InterruptedException e) { 447 LOG.info("Sleep interrupted " + e.getMessage()); 448 } 449 450 // No. of containers to request 451 // For the first loop, askCount will be equal to total containers needed 452 // From that point on, askCount will always be 0 as current implementation 453 // does not change its ask on container failures. 454 int askCount = numTotalContainers - numRequestedContainers.get(); 455 numRequestedContainers.addAndGet(askCount); 456 457 // Setup request to be sent to RM to allocate containers 458 List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>(); 459 if (askCount > 0) { 460 ResourceRequest containerAsk = setupContainerAskForRM(askCount); 461 resourceReq.add(containerAsk); 462 } 463 464 // Send the request to RM 465 LOG.info("Asking RM for containers" 466 + ", askCount=" + askCount); 467 AMResponse amResp =sendContainerAskToRM(resourceReq); 468 469 // Retrieve list of allocated containers from the response 470 List<Container> allocatedContainers = amResp.getAllocatedContainers(); 471 LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); 472 numAllocatedContainers.addAndGet(allocatedContainers.size()); 473 for (Container allocatedContainer : allocatedContainers) { 474 LOG.info("Launching shell command on a new container." 475 + ", containerId=" + allocatedContainer.getId() 476 + ", containerNode=" + allocatedContainer.getNodeId().getHost() 477 + ":" + allocatedContainer.getNodeId().getPort() 478 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() 479 + ", containerState" + allocatedContainer.getState() 480 + ", containerResourceMemory" + allocatedContainer.getResource().getMemory()); 481 //+ ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString()); 482 483 LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer); 484 Thread launchThread = new Thread(runnableLaunchContainer); 485 486 // launch and start the container on a separate thread to keep the main thread unblocked 487 // as all containers may not be allocated at one go. 488 launchThreads.add(launchThread); 489 launchThread.start(); 490 } 491 492 // Check what the current available resources in the cluster are 493 // TODO should we do anything if the available resources are not enough? 494 Resource availableResources = amResp.getAvailableResources(); 495 LOG.info("Current available resources in the cluster " + availableResources); 496 497 // Check the completed containers 498 List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses(); 499 LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size()); 500 for (ContainerStatus containerStatus : completedContainers) { 501 LOG.info("Got container status for containerID= " + containerStatus.getContainerId() 502 + ", state=" + containerStatus.getState() 503 + ", exitStatus=" + containerStatus.getExitStatus() 504 + ", diagnostics=" + containerStatus.getDiagnostics()); 505 506 // non complete containers should not be here 507 assert(containerStatus.getState() == ContainerState.COMPLETE); 508 509 // increment counters for completed/failed containers 510 int exitStatus = containerStatus.getExitStatus(); 511 if (0 != exitStatus) { 512 // container failed 513 if (-100 != exitStatus) { 514 // shell script failed 515 // counts as completed 516 numCompletedContainers.incrementAndGet(); 517 numFailedContainers.incrementAndGet(); 518 } 519 else { 520 // something else bad happened 521 // app job did not complete for some reason 522 // we should re-try as the container was lost for some reason 523 numAllocatedContainers.decrementAndGet(); 524 numRequestedContainers.decrementAndGet(); 525 // we do not need to release the container as it would be done 526 // by the RM/CM. 527 } 528 } 529 else { 530 // nothing to do 531 // container completed successfully 532 numCompletedContainers.incrementAndGet(); 533 LOG.info("Container completed successfully." 534 + ", containerId=" + containerStatus.getContainerId()); 535 } 536 537 } 538 if (numCompletedContainers.get() == numTotalContainers) { 539 appDone = true; 540 } 541 542 LOG.info("Current application state: loop=" + loopCounter 543 + ", appDone=" + appDone 544 + ", total=" + numTotalContainers 545 + ", requested=" + numRequestedContainers 546 + ", completed=" + numCompletedContainers 547 + ", failed=" + numFailedContainers 548 + ", currentAllocated=" + numAllocatedContainers); 549 550 // TODO 551 // Add a timeout handling layer 552 // for misbehaving shell commands 553 } 554 555 // Join all launched threads 556 // needed for when we time out 557 // and we need to release containers 558 for (Thread launchThread : launchThreads) { 559 try { 560 launchThread.join(10000); 561 } catch (InterruptedException e) { 562 LOG.info("Exception thrown in thread join: " + e.getMessage()); 563 e.printStackTrace(); 564 } 565 } 566 567 // When the application completes, it should send a finish application signal 568 // to the RM 569 LOG.info("Application completed. Signalling finish to RM"); 570 571 FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class); 572 finishReq.setAppAttemptId(appAttemptID); 573 boolean isSuccess = true; 574 if (numFailedContainers.get() == 0) { 575 finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); 576 } 577 else { 578 finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); 579 String diagnostics = "Diagnostics." 580 + ", total=" + numTotalContainers 581 + ", completed=" + numCompletedContainers.get() 582 + ", allocated=" + numAllocatedContainers.get() 583 + ", failed=" + numFailedContainers.get(); 584 finishReq.setDiagnostics(diagnostics); 585 isSuccess = false; 586 } 587 resourceManager.finishApplicationMaster(finishReq); 588 return isSuccess; 589 } 590 591 /** 592 * Thread to connect to the {@link ContainerManager} and 593 * launch the container that will execute the shell command. 594 */ 595 private class LaunchContainerRunnable implements Runnable { 596 597 // Allocated container 598 Container container; 599 // Handle to communicate with ContainerManager 600 ContainerManager cm; 601 602 /** 603 * @param lcontainer Allocated container 604 */ 605 public LaunchContainerRunnable(Container lcontainer) { 606 this.container = lcontainer; 607 } 608 609 /** 610 * Helper function to connect to CM 611 */ 612 private void connectToCM() { 613 LOG.debug("Connecting to ContainerManager for containerid=" + container.getId()); 614 String cmIpPortStr = container.getNodeId().getHost() + ":" 615 + container.getNodeId().getPort(); 616 InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); 617 LOG.info("Connecting to ContainerManager at " + cmIpPortStr); 618 this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf)); 619 } 620 621 622 @Override 623 /** 624 * Connects to CM, sets up container launch context 625 * for shell command and eventually dispatches the container 626 * start request to the CM. 627 */ 628 public void run() { 629 // Connect to ContainerManager 630 connectToCM(); 631 632 LOG.info("Setting up container launch container for containerid=" + container.getId()); 633 ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); 634 635 ctx.setContainerId(container.getId()); 636 ctx.setResource(container.getResource()); 637 638 try { 639 ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName()); 640 } catch (IOException e) { 641 LOG.info("Getting current user info failed when trying to launch the container" 642 + e.getMessage()); 643 } 644 645 // Set the environment 646 ctx.setEnvironment(shellEnv); 647 648 // Set the local resources 649 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 650 651 // The container for the eventual shell commands needs its own local resources too. 652 // In this scenario, if a shell script is specified, we need to have it copied 653 // and made available to the container. 654 if (!shellScriptPath.isEmpty()) { 655 LocalResource shellRsrc = Records.newRecord(LocalResource.class); 656 shellRsrc.setType(LocalResourceType.FILE); 657 shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION); 658 try { 659 shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath))); 660 } catch (URISyntaxException e) { 661 LOG.error("Error when trying to use shell script path specified in env" 662 + ", path=" + shellScriptPath); 663 e.printStackTrace(); 664 665 // A failure scenario on bad input such as invalid shell script path 666 // We know we cannot continue launching the container 667 // so we should release it. 668 // TODO 669 numCompletedContainers.incrementAndGet(); 670 numFailedContainers.incrementAndGet(); 671 return; 672 } 673 shellRsrc.setTimestamp(shellScriptPathTimestamp); 674 shellRsrc.setSize(shellScriptPathLen); 675 localResources.put(ExecShellStringPath, shellRsrc); 676 } 677 ctx.setLocalResources(localResources); 678 679 // Set the necessary command to execute on the allocated container 680 Vector<CharSequence> vargs = new Vector<CharSequence>(5); 681 682 // Set executable command 683 vargs.add(shellCommand); 684 // Set shell script path 685 if (!shellScriptPath.isEmpty()) { 686 vargs.add(ExecShellStringPath); 687 } 688 689 // Set args for the shell command if any 690 vargs.add(shellArgs); 691 // Add log redirect params 692 // TODO 693 // We should redirect the output to hdfs instead of local logs 694 // so as to be able to look at the final output after the containers 695 // have been released. 696 // Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err] 697 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); 698 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); 699 700 // Get final commmand 701 StringBuilder command = new StringBuilder(); 702 for (CharSequence str : vargs) { 703 command.append(str).append(" "); 704 } 705 706 List<String> commands = new ArrayList<String>(); 707 commands.add(command.toString()); 708 ctx.setCommands(commands); 709 710 StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class); 711 startReq.setContainerLaunchContext(ctx); 712 try { 713 cm.startContainer(startReq); 714 } catch (YarnRemoteException e) { 715 LOG.info("Start container failed for :" 716 + ", containerId=" + container.getId()); 717 e.printStackTrace(); 718 // TODO do we need to release this container? 719 } 720 721 // Get container status? 722 // Left commented out as the shell scripts are short lived 723 // and we are relying on the status for completed containers from RM to detect status 724 725 // GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class); 726 // statusReq.setContainerId(container.getId()); 727 // GetContainerStatusResponse statusResp; 728 //try { 729 //statusResp = cm.getContainerStatus(statusReq); 730 // LOG.info("Container Status" 731 // + ", id=" + container.getId() 732 // + ", status=" +statusResp.getStatus()); 733 //} catch (YarnRemoteException e) { 734 //e.printStackTrace(); 735 //} 736 } 737 } 738 739 /** 740 * Connect to the Resource Manager 741 * @return Handle to communicate with the RM 742 */ 743 private AMRMProtocol connectToRM() { 744 YarnConfiguration yarnConf = new YarnConfiguration(conf); 745 InetSocketAddress rmAddress = yarnConf.getSocketAddr( 746 YarnConfiguration.RM_SCHEDULER_ADDRESS, 747 YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, 748 YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); 749 LOG.info("Connecting to ResourceManager at " + rmAddress); 750 return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf)); 751 } 752 753 /** 754 * Register the Application Master to the Resource Manager 755 * @return the registration response from the RM 756 * @throws YarnRemoteException 757 */ 758 private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException { 759 RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class); 760 761 // set the required info into the registration request: 762 // application attempt id, 763 // host on which the app master is running 764 // rpc port on which the app master accepts requests from the client 765 // tracking url for the app master 766 appMasterRequest.setApplicationAttemptId(appAttemptID); 767 appMasterRequest.setHost(appMasterHostname); 768 appMasterRequest.setRpcPort(appMasterRpcPort); 769 appMasterRequest.setTrackingUrl(appMasterTrackingUrl); 770 771 return resourceManager.registerApplicationMaster(appMasterRequest); 772 } 773 774 /** 775 * Setup the request that will be sent to the RM for the container ask. 776 * @param numContainers Containers to ask for from RM 777 * @return the setup ResourceRequest to be sent to RM 778 */ 779 private ResourceRequest setupContainerAskForRM(int numContainers) { 780 ResourceRequest request = Records.newRecord(ResourceRequest.class); 781 782 // setup requirements for hosts 783 // whether a particular rack/host is needed 784 // Refer to apis under org.apache.hadoop.net for more 785 // details on how to get figure out rack/host mapping. 786 // using * as any host will do for the distributed shell app 787 request.setHostName("*"); 788 789 // set no. of containers needed 790 request.setNumContainers(numContainers); 791 792 // set the priority for the request 793 Priority pri = Records.newRecord(Priority.class); 794 // TODO - what is the range for priority? how to decide? 795 pri.setPriority(requestPriority); 796 request.setPriority(pri); 797 798 // Set up resource type requirements 799 // For now, only memory is supported so we set memory requirements 800 Resource capability = Records.newRecord(Resource.class); 801 capability.setMemory(containerMemory); 802 request.setCapability(capability); 803 804 return request; 805 } 806 807 /** 808 * Ask RM to allocate given no. of containers to this Application Master 809 * @param requestedContainers Containers to ask for from RM 810 * @return Response from RM to AM with allocated containers 811 * @throws YarnRemoteException 812 */ 813 private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers) 814 throws YarnRemoteException { 815 AllocateRequest req = Records.newRecord(AllocateRequest.class); 816 req.setResponseId(rmRequestID.incrementAndGet()); 817 req.setApplicationAttemptId(appAttemptID); 818 req.addAllAsks(requestedContainers); 819 req.addAllReleases(releasedContainers); 820 req.setProgress((float)numCompletedContainers.get()/numTotalContainers); 821 822 LOG.info("Sending request to RM for containers" 823 + ", requestedSet=" + requestedContainers.size() 824 + ", releasedSet=" + releasedContainers.size() 825 + ", progress=" + req.getProgress()); 826 827 for (ResourceRequest rsrcReq : requestedContainers) { 828 LOG.info("Requested container ask: " + rsrcReq.toString()); 829 } 830 for (ContainerId id : releasedContainers) { 831 LOG.info("Released container, id=" + id.getId()); 832 } 833 834 AllocateResponse resp = resourceManager.allocate(req); 835 return resp.getAMResponse(); 836 } 837}