001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.applications.distributedshell;
020
021 import java.io.BufferedReader;
022 import java.io.DataInputStream;
023 import java.io.File;
024 import java.io.FileInputStream;
025 import java.io.IOException;
026 import java.io.StringReader;
027 import java.net.URI;
028 import java.net.URISyntaxException;
029 import java.nio.ByteBuffer;
030 import java.security.PrivilegedExceptionAction;
031 import java.util.ArrayList;
032 import java.util.HashMap;
033 import java.util.Iterator;
034 import java.util.List;
035 import java.util.Map;
036 import java.util.Vector;
037 import java.util.concurrent.ConcurrentHashMap;
038 import java.util.concurrent.ConcurrentMap;
039 import java.util.concurrent.atomic.AtomicInteger;
040
041 import org.apache.commons.cli.CommandLine;
042 import org.apache.commons.cli.GnuParser;
043 import org.apache.commons.cli.HelpFormatter;
044 import org.apache.commons.cli.Options;
045 import org.apache.commons.cli.ParseException;
046 import org.apache.commons.logging.Log;
047 import org.apache.commons.logging.LogFactory;
048 import org.apache.hadoop.classification.InterfaceAudience;
049 import org.apache.hadoop.classification.InterfaceAudience.Private;
050 import org.apache.hadoop.classification.InterfaceStability;
051 import org.apache.hadoop.conf.Configuration;
052 import org.apache.hadoop.fs.FileSystem;
053 import org.apache.hadoop.fs.Path;
054 import org.apache.hadoop.io.DataOutputBuffer;
055 import org.apache.hadoop.io.IOUtils;
056 import org.apache.hadoop.net.NetUtils;
057 import org.apache.hadoop.security.Credentials;
058 import org.apache.hadoop.security.UserGroupInformation;
059 import org.apache.hadoop.security.token.Token;
060 import org.apache.hadoop.util.ExitUtil;
061 import org.apache.hadoop.util.Shell;
062 import org.apache.hadoop.yarn.api.ApplicationConstants;
063 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
064 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
065 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
066 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
067 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
068 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
069 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
070 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
071 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
072 import org.apache.hadoop.yarn.api.records.Container;
073 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
074 import org.apache.hadoop.yarn.api.records.ContainerId;
075 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
076 import org.apache.hadoop.yarn.api.records.ContainerState;
077 import org.apache.hadoop.yarn.api.records.ContainerStatus;
078 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
079 import org.apache.hadoop.yarn.api.records.LocalResource;
080 import org.apache.hadoop.yarn.api.records.LocalResourceType;
081 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
082 import org.apache.hadoop.yarn.api.records.NodeReport;
083 import org.apache.hadoop.yarn.api.records.Priority;
084 import org.apache.hadoop.yarn.api.records.Resource;
085 import org.apache.hadoop.yarn.api.records.ResourceRequest;
086 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
087 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
088 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
089 import org.apache.hadoop.yarn.client.api.TimelineClient;
090 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
091 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
092 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
093 import org.apache.hadoop.yarn.conf.YarnConfiguration;
094 import org.apache.hadoop.yarn.exceptions.YarnException;
095 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
096 import org.apache.hadoop.yarn.util.ConverterUtils;
097 import org.apache.hadoop.yarn.util.Records;
098 import org.apache.log4j.LogManager;
099
100 import com.google.common.annotations.VisibleForTesting;
101
102 /**
103 * An ApplicationMaster for executing shell commands on a set of launched
104 * containers using the YARN framework.
105 *
106 * <p>
107 * This class is meant to act as an example on how to write yarn-based
108 * application masters.
109 * </p>
110 *
111 * <p>
112 * The ApplicationMaster is started on a container by the
113 * <code>ResourceManager</code>'s launcher. The first thing that the
114 * <code>ApplicationMaster</code> needs to do is to connect and register itself
115 * with the <code>ResourceManager</code>. The registration sets up information
116 * within the <code>ResourceManager</code> regarding what host:port the
117 * ApplicationMaster is listening on to provide any form of functionality to a
118 * client as well as a tracking url that a client can use to keep track of
119 * status/job history if needed. However, in the distributedshell, trackingurl
120 * and appMasterHost:appMasterRpcPort are not supported.
121 * </p>
122 *
123 * <p>
124 * The <code>ApplicationMaster</code> needs to send a heartbeat to the
125 * <code>ResourceManager</code> at regular intervals to inform the
126 * <code>ResourceManager</code> that it is up and alive. The
127 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
128 * <code>ApplicationMaster</code> acts as a heartbeat.
129 *
130 * <p>
131 * For the actual handling of the job, the <code>ApplicationMaster</code> has to
132 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
133 * required no. of containers using {@link ResourceRequest} with the necessary
134 * resource specifications such as node location, computational
135 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
136 * responds with an {@link AllocateResponse} that informs the
137 * <code>ApplicationMaster</code> of the set of newly allocated containers,
138 * completed containers as well as current state of available resources.
139 * </p>
140 *
141 * <p>
142 * For each allocated container, the <code>ApplicationMaster</code> can then set
143 * up the necessary launch context via {@link ContainerLaunchContext} to specify
144 * the allocated container id, local resources required by the executable, the
145 * environment to be setup for the executable, commands to execute, etc. and
146 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
147 * launch and execute the defined commands on the given allocated container.
148 * </p>
149 *
150 * <p>
151 * The <code>ApplicationMaster</code> can monitor the launched container by
152 * either querying the <code>ResourceManager</code> using
153 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
154 * the {@link ContainerManagementProtocol} by querying for the status of the allocated
155 * container's {@link ContainerId}.
156 *
157 * <p>
158 * After the job has been completed, the <code>ApplicationMaster</code> has to
159 * send a {@link FinishApplicationMasterRequest} to the
160 * <code>ResourceManager</code> to inform it that the
161 * <code>ApplicationMaster</code> has been completed.
162 */
163 @InterfaceAudience.Public
164 @InterfaceStability.Unstable
165 public class ApplicationMaster {
166
167 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
168
169 @VisibleForTesting
170 @Private
171 public static enum DSEvent {
172 DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
173 }
174
175 @VisibleForTesting
176 @Private
177 public static enum DSEntity {
178 DS_APP_ATTEMPT, DS_CONTAINER
179 }
180
181 // Configuration
182 private Configuration conf;
183
184 // Handle to communicate with the Resource Manager
185 @SuppressWarnings("rawtypes")
186 private AMRMClientAsync amRMClient;
187
188 // In both secure and non-secure modes, this points to the job-submitter.
189 private UserGroupInformation appSubmitterUgi;
190
191 // Handle to communicate with the Node Manager
192 private NMClientAsync nmClientAsync;
193 // Listen to process the response from the Node Manager
194 private NMCallbackHandler containerListener;
195
196 // Application Attempt Id ( combination of attemptId and fail count )
197 @VisibleForTesting
198 protected ApplicationAttemptId appAttemptID;
199
200 // TODO
201 // For status update for clients - yet to be implemented
202 // Hostname of the container
203 private String appMasterHostname = "";
204 // Port on which the app master listens for status updates from clients
205 private int appMasterRpcPort = -1;
206 // Tracking url to which app master publishes info for clients to monitor
207 private String appMasterTrackingUrl = "";
208
209 // App Master configuration
210 // No. of containers to run shell command on
211 private int numTotalContainers = 1;
212 // Memory to request for the container on which the shell command will run
213 private int containerMemory = 10;
214 // VirtualCores to request for the container on which the shell command will run
215 private int containerVirtualCores = 1;
216 // Priority of the request
217 private int requestPriority;
218
219 // Counter for completed containers ( complete denotes successful or failed )
220 private AtomicInteger numCompletedContainers = new AtomicInteger();
221 // Allocated container count so that we know how many containers has the RM
222 // allocated to us
223 @VisibleForTesting
224 protected AtomicInteger numAllocatedContainers = new AtomicInteger();
225 // Count of failed containers
226 private AtomicInteger numFailedContainers = new AtomicInteger();
227 // Count of containers already requested from the RM
228 // Needed as once requested, we should not request for containers again.
229 // Only request for more if the original requirement changes.
230 @VisibleForTesting
231 protected AtomicInteger numRequestedContainers = new AtomicInteger();
232
233 // Shell command to be executed
234 private String shellCommand = "";
235 // Args to be passed to the shell command
236 private String shellArgs = "";
237 // Env variables to be setup for the shell command
238 private Map<String, String> shellEnv = new HashMap<String, String>();
239
240 // Location of shell script ( obtained from info set in env )
241 // Shell script path in fs
242 private String scriptPath = "";
243 // Timestamp needed for creating a local resource
244 private long shellScriptPathTimestamp = 0;
245 // File length needed for local resource
246 private long shellScriptPathLen = 0;
247
248 // Hardcoded path to shell script in launch container's local env
249 private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
250 private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
251 + ".bat";
252
253 // Hardcoded path to custom log_properties
254 private static final String log4jPath = "log4j.properties";
255
256 private static final String shellCommandPath = "shellCommands";
257 private static final String shellArgsPath = "shellArgs";
258
259 private volatile boolean done;
260
261 private ByteBuffer allTokens;
262
263 // Launch threads
264 private List<Thread> launchThreads = new ArrayList<Thread>();
265
266 // Timeline Client
267 private TimelineClient timelineClient;
268
269 private final String linux_bash_command = "bash";
270 private final String windows_command = "cmd /c";
271
272 /**
273 * @param args Command line args
274 */
275 public static void main(String[] args) {
276 boolean result = false;
277 try {
278 ApplicationMaster appMaster = new ApplicationMaster();
279 LOG.info("Initializing ApplicationMaster");
280 boolean doRun = appMaster.init(args);
281 if (!doRun) {
282 System.exit(0);
283 }
284 appMaster.run();
285 result = appMaster.finish();
286 } catch (Throwable t) {
287 LOG.fatal("Error running ApplicationMaster", t);
288 LogManager.shutdown();
289 ExitUtil.terminate(1, t);
290 }
291 if (result) {
292 LOG.info("Application Master completed successfully. exiting");
293 System.exit(0);
294 } else {
295 LOG.info("Application Master failed. exiting");
296 System.exit(2);
297 }
298 }
299
300 /**
301 * Dump out contents of $CWD and the environment to stdout for debugging
302 */
303 private void dumpOutDebugInfo() {
304
305 LOG.info("Dump debug output");
306 Map<String, String> envs = System.getenv();
307 for (Map.Entry<String, String> env : envs.entrySet()) {
308 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
309 System.out.println("System env: key=" + env.getKey() + ", val="
310 + env.getValue());
311 }
312
313 BufferedReader buf = null;
314 try {
315 String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
316 Shell.execCommand("ls", "-al");
317 buf = new BufferedReader(new StringReader(lines));
318 String line = "";
319 while ((line = buf.readLine()) != null) {
320 LOG.info("System CWD content: " + line);
321 System.out.println("System CWD content: " + line);
322 }
323 } catch (IOException e) {
324 e.printStackTrace();
325 } finally {
326 IOUtils.cleanup(LOG, buf);
327 }
328 }
329
330 public ApplicationMaster() {
331 // Set up the configuration
332 conf = new YarnConfiguration();
333 }
334
335 /**
336 * Parse command line options
337 *
338 * @param args Command line args
339 * @return Whether init successful and run should be invoked
340 * @throws ParseException
341 * @throws IOException
342 */
343 public boolean init(String[] args) throws ParseException, IOException {
344 Options opts = new Options();
345 opts.addOption("app_attempt_id", true,
346 "App Attempt ID. Not to be used unless for testing purposes");
347 opts.addOption("shell_env", true,
348 "Environment for shell script. Specified as env_key=env_val pairs");
349 opts.addOption("container_memory", true,
350 "Amount of memory in MB to be requested to run the shell command");
351 opts.addOption("container_vcores", true,
352 "Amount of virtual cores to be requested to run the shell command");
353 opts.addOption("num_containers", true,
354 "No. of containers on which the shell command needs to be executed");
355 opts.addOption("priority", true, "Application Priority. Default 0");
356 opts.addOption("debug", false, "Dump out debug information");
357
358 opts.addOption("help", false, "Print usage");
359 CommandLine cliParser = new GnuParser().parse(opts, args);
360
361 if (args.length == 0) {
362 printUsage(opts);
363 throw new IllegalArgumentException(
364 "No args specified for application master to initialize");
365 }
366
367 //Check whether customer log4j.properties file exists
368 if (fileExist(log4jPath)) {
369 try {
370 Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
371 log4jPath);
372 } catch (Exception e) {
373 LOG.warn("Can not set up custom log4j properties. " + e);
374 }
375 }
376
377 if (cliParser.hasOption("help")) {
378 printUsage(opts);
379 return false;
380 }
381
382 if (cliParser.hasOption("debug")) {
383 dumpOutDebugInfo();
384 }
385
386 Map<String, String> envs = System.getenv();
387
388 if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
389 if (cliParser.hasOption("app_attempt_id")) {
390 String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
391 appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
392 } else {
393 throw new IllegalArgumentException(
394 "Application Attempt Id not set in the environment");
395 }
396 } else {
397 ContainerId containerId = ConverterUtils.toContainerId(envs
398 .get(Environment.CONTAINER_ID.name()));
399 appAttemptID = containerId.getApplicationAttemptId();
400 }
401
402 if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
403 throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
404 + " not set in the environment");
405 }
406 if (!envs.containsKey(Environment.NM_HOST.name())) {
407 throw new RuntimeException(Environment.NM_HOST.name()
408 + " not set in the environment");
409 }
410 if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
411 throw new RuntimeException(Environment.NM_HTTP_PORT
412 + " not set in the environment");
413 }
414 if (!envs.containsKey(Environment.NM_PORT.name())) {
415 throw new RuntimeException(Environment.NM_PORT.name()
416 + " not set in the environment");
417 }
418
419 LOG.info("Application master for app" + ", appId="
420 + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
421 + appAttemptID.getApplicationId().getClusterTimestamp()
422 + ", attemptId=" + appAttemptID.getAttemptId());
423
424 if (!fileExist(shellCommandPath)
425 && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
426 throw new IllegalArgumentException(
427 "No shell command or shell script specified to be executed by application master");
428 }
429
430 if (fileExist(shellCommandPath)) {
431 shellCommand = readContent(shellCommandPath);
432 }
433
434 if (fileExist(shellArgsPath)) {
435 shellArgs = readContent(shellArgsPath);
436 }
437
438 if (cliParser.hasOption("shell_env")) {
439 String shellEnvs[] = cliParser.getOptionValues("shell_env");
440 for (String env : shellEnvs) {
441 env = env.trim();
442 int index = env.indexOf('=');
443 if (index == -1) {
444 shellEnv.put(env, "");
445 continue;
446 }
447 String key = env.substring(0, index);
448 String val = "";
449 if (index < (env.length() - 1)) {
450 val = env.substring(index + 1);
451 }
452 shellEnv.put(key, val);
453 }
454 }
455
456 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
457 scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
458
459 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
460 shellScriptPathTimestamp = Long.valueOf(envs
461 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
462 }
463 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
464 shellScriptPathLen = Long.valueOf(envs
465 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
466 }
467
468 if (!scriptPath.isEmpty()
469 && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
470 LOG.error("Illegal values in env for shell script path" + ", path="
471 + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
472 + shellScriptPathTimestamp);
473 throw new IllegalArgumentException(
474 "Illegal values in env for shell script path");
475 }
476 }
477
478 containerMemory = Integer.parseInt(cliParser.getOptionValue(
479 "container_memory", "10"));
480 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
481 "container_vcores", "1"));
482 numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
483 "num_containers", "1"));
484 if (numTotalContainers == 0) {
485 throw new IllegalArgumentException(
486 "Cannot run distributed shell with no containers");
487 }
488 requestPriority = Integer.parseInt(cliParser
489 .getOptionValue("priority", "0"));
490
491 // Creating the Timeline Client
492 timelineClient = TimelineClient.createTimelineClient();
493 timelineClient.init(conf);
494 timelineClient.start();
495
496 return true;
497 }
498
499 /**
500 * Helper function to print usage
501 *
502 * @param opts Parsed command line options
503 */
504 private void printUsage(Options opts) {
505 new HelpFormatter().printHelp("ApplicationMaster", opts);
506 }
507
508 /**
509 * Main run function for the application master
510 *
511 * @throws YarnException
512 * @throws IOException
513 */
514 @SuppressWarnings({ "unchecked" })
515 public void run() throws YarnException, IOException {
516 LOG.info("Starting ApplicationMaster");
517 try {
518 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
519 DSEvent.DS_APP_ATTEMPT_START);
520 } catch (Exception e) {
521 LOG.error("App Attempt start event coud not be pulished for "
522 + appAttemptID.toString(), e);
523 }
524
525 Credentials credentials =
526 UserGroupInformation.getCurrentUser().getCredentials();
527 DataOutputBuffer dob = new DataOutputBuffer();
528 credentials.writeTokenStorageToStream(dob);
529 // Now remove the AM->RM token so that containers cannot access it.
530 Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
531 LOG.info("Executing with tokens:");
532 while (iter.hasNext()) {
533 Token<?> token = iter.next();
534 LOG.info(token);
535 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
536 iter.remove();
537 }
538 }
539 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
540
541 // Create appSubmitterUgi and add original tokens to it
542 String appSubmitterUserName =
543 System.getenv(ApplicationConstants.Environment.USER.name());
544 appSubmitterUgi =
545 UserGroupInformation.createRemoteUser(appSubmitterUserName);
546 appSubmitterUgi.addCredentials(credentials);
547
548 AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
549 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
550 amRMClient.init(conf);
551 amRMClient.start();
552
553 containerListener = createNMCallbackHandler();
554 nmClientAsync = new NMClientAsyncImpl(containerListener);
555 nmClientAsync.init(conf);
556 nmClientAsync.start();
557
558 // Setup local RPC Server to accept status requests directly from clients
559 // TODO need to setup a protocol for client to be able to communicate to
560 // the RPC server
561 // TODO use the rpc port info to register with the RM for the client to
562 // send requests to this app master
563
564 // Register self with ResourceManager
565 // This will start heartbeating to the RM
566 appMasterHostname = NetUtils.getHostname();
567 RegisterApplicationMasterResponse response = amRMClient
568 .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
569 appMasterTrackingUrl);
570 // Dump out information about cluster capability as seen by the
571 // resource manager
572 int maxMem = response.getMaximumResourceCapability().getMemory();
573 LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
574
575 int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
576 LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
577
578 // A resource ask cannot exceed the max.
579 if (containerMemory > maxMem) {
580 LOG.info("Container memory specified above max threshold of cluster."
581 + " Using max value." + ", specified=" + containerMemory + ", max="
582 + maxMem);
583 containerMemory = maxMem;
584 }
585
586 if (containerVirtualCores > maxVCores) {
587 LOG.info("Container virtual cores specified above max threshold of cluster."
588 + " Using max value." + ", specified=" + containerVirtualCores + ", max="
589 + maxVCores);
590 containerVirtualCores = maxVCores;
591 }
592
593 List<Container> previousAMRunningContainers =
594 response.getContainersFromPreviousAttempts();
595 LOG.info("Received " + previousAMRunningContainers.size()
596 + " previous AM's running containers on AM registration.");
597 numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
598
599 int numTotalContainersToRequest =
600 numTotalContainers - previousAMRunningContainers.size();
601 // Setup ask for containers from RM
602 // Send request for containers to RM
603 // Until we get our fully allocated quota, we keep on polling RM for
604 // containers
605 // Keep looping until all the containers are launched and shell script
606 // executed on them ( regardless of success/failure).
607 for (int i = 0; i < numTotalContainersToRequest; ++i) {
608 ContainerRequest containerAsk = setupContainerAskForRM();
609 amRMClient.addContainerRequest(containerAsk);
610 }
611 numRequestedContainers.set(numTotalContainersToRequest);
612 try {
613 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
614 DSEvent.DS_APP_ATTEMPT_END);
615 } catch (Exception e) {
616 LOG.error("App Attempt start event coud not be pulished for "
617 + appAttemptID.toString(), e);
618 }
619 }
620
621 @VisibleForTesting
622 NMCallbackHandler createNMCallbackHandler() {
623 return new NMCallbackHandler(this);
624 }
625
626 @VisibleForTesting
627 protected boolean finish() {
628 // wait for completion.
629 while (!done
630 && (numCompletedContainers.get() != numTotalContainers)) {
631 try {
632 Thread.sleep(200);
633 } catch (InterruptedException ex) {}
634 }
635
636 // Join all launched threads
637 // needed for when we time out
638 // and we need to release containers
639 for (Thread launchThread : launchThreads) {
640 try {
641 launchThread.join(10000);
642 } catch (InterruptedException e) {
643 LOG.info("Exception thrown in thread join: " + e.getMessage());
644 e.printStackTrace();
645 }
646 }
647
648 // When the application completes, it should stop all running containers
649 LOG.info("Application completed. Stopping running containers");
650 nmClientAsync.stop();
651
652 // When the application completes, it should send a finish application
653 // signal to the RM
654 LOG.info("Application completed. Signalling finish to RM");
655
656 FinalApplicationStatus appStatus;
657 String appMessage = null;
658 boolean success = true;
659 if (numFailedContainers.get() == 0 &&
660 numCompletedContainers.get() == numTotalContainers) {
661 appStatus = FinalApplicationStatus.SUCCEEDED;
662 } else {
663 appStatus = FinalApplicationStatus.FAILED;
664 appMessage = "Diagnostics." + ", total=" + numTotalContainers
665 + ", completed=" + numCompletedContainers.get() + ", allocated="
666 + numAllocatedContainers.get() + ", failed="
667 + numFailedContainers.get();
668 success = false;
669 }
670 try {
671 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
672 } catch (YarnException ex) {
673 LOG.error("Failed to unregister application", ex);
674 } catch (IOException e) {
675 LOG.error("Failed to unregister application", e);
676 }
677
678 amRMClient.stop();
679
680 return success;
681 }
682
683 private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
684 @SuppressWarnings("unchecked")
685 @Override
686 public void onContainersCompleted(List<ContainerStatus> completedContainers) {
687 LOG.info("Got response from RM for container ask, completedCnt="
688 + completedContainers.size());
689 for (ContainerStatus containerStatus : completedContainers) {
690 LOG.info("Got container status for containerID="
691 + containerStatus.getContainerId() + ", state="
692 + containerStatus.getState() + ", exitStatus="
693 + containerStatus.getExitStatus() + ", diagnostics="
694 + containerStatus.getDiagnostics());
695
696 // non complete containers should not be here
697 assert (containerStatus.getState() == ContainerState.COMPLETE);
698
699 // increment counters for completed/failed containers
700 int exitStatus = containerStatus.getExitStatus();
701 if (0 != exitStatus) {
702 // container failed
703 if (ContainerExitStatus.ABORTED != exitStatus) {
704 // shell script failed
705 // counts as completed
706 numCompletedContainers.incrementAndGet();
707 numFailedContainers.incrementAndGet();
708 } else {
709 // container was killed by framework, possibly preempted
710 // we should re-try as the container was lost for some reason
711 numAllocatedContainers.decrementAndGet();
712 numRequestedContainers.decrementAndGet();
713 // we do not need to release the container as it would be done
714 // by the RM
715 }
716 } else {
717 // nothing to do
718 // container completed successfully
719 numCompletedContainers.incrementAndGet();
720 LOG.info("Container completed successfully." + ", containerId="
721 + containerStatus.getContainerId());
722 }
723 try {
724 publishContainerEndEvent(timelineClient, containerStatus);
725 } catch (Exception e) {
726 LOG.error("Container start event could not be pulished for "
727 + containerStatus.getContainerId().toString(), e);
728 }
729 }
730
731 // ask for more containers if any failed
732 int askCount = numTotalContainers - numRequestedContainers.get();
733 numRequestedContainers.addAndGet(askCount);
734
735 if (askCount > 0) {
736 for (int i = 0; i < askCount; ++i) {
737 ContainerRequest containerAsk = setupContainerAskForRM();
738 amRMClient.addContainerRequest(containerAsk);
739 }
740 }
741
742 if (numCompletedContainers.get() == numTotalContainers) {
743 done = true;
744 }
745 }
746
747 @Override
748 public void onContainersAllocated(List<Container> allocatedContainers) {
749 LOG.info("Got response from RM for container ask, allocatedCnt="
750 + allocatedContainers.size());
751 numAllocatedContainers.addAndGet(allocatedContainers.size());
752 for (Container allocatedContainer : allocatedContainers) {
753 LOG.info("Launching shell command on a new container."
754 + ", containerId=" + allocatedContainer.getId()
755 + ", containerNode=" + allocatedContainer.getNodeId().getHost()
756 + ":" + allocatedContainer.getNodeId().getPort()
757 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
758 + ", containerResourceMemory"
759 + allocatedContainer.getResource().getMemory()
760 + ", containerResourceVirtualCores"
761 + allocatedContainer.getResource().getVirtualCores());
762 // + ", containerToken"
763 // +allocatedContainer.getContainerToken().getIdentifier().toString());
764
765 LaunchContainerRunnable runnableLaunchContainer =
766 new LaunchContainerRunnable(allocatedContainer, containerListener);
767 Thread launchThread = new Thread(runnableLaunchContainer);
768
769 // launch and start the container on a separate thread to keep
770 // the main thread unblocked
771 // as all containers may not be allocated at one go.
772 launchThreads.add(launchThread);
773 launchThread.start();
774 }
775 }
776
777 @Override
778 public void onShutdownRequest() {
779 done = true;
780 }
781
782 @Override
783 public void onNodesUpdated(List<NodeReport> updatedNodes) {}
784
785 @Override
786 public float getProgress() {
787 // set progress to deliver to RM on next heartbeat
788 float progress = (float) numCompletedContainers.get()
789 / numTotalContainers;
790 return progress;
791 }
792
793 @Override
794 public void onError(Throwable e) {
795 done = true;
796 amRMClient.stop();
797 }
798 }
799
800 @VisibleForTesting
801 static class NMCallbackHandler
802 implements NMClientAsync.CallbackHandler {
803
804 private ConcurrentMap<ContainerId, Container> containers =
805 new ConcurrentHashMap<ContainerId, Container>();
806 private final ApplicationMaster applicationMaster;
807
808 public NMCallbackHandler(ApplicationMaster applicationMaster) {
809 this.applicationMaster = applicationMaster;
810 }
811
812 public void addContainer(ContainerId containerId, Container container) {
813 containers.putIfAbsent(containerId, container);
814 }
815
816 @Override
817 public void onContainerStopped(ContainerId containerId) {
818 if (LOG.isDebugEnabled()) {
819 LOG.debug("Succeeded to stop Container " + containerId);
820 }
821 containers.remove(containerId);
822 }
823
824 @Override
825 public void onContainerStatusReceived(ContainerId containerId,
826 ContainerStatus containerStatus) {
827 if (LOG.isDebugEnabled()) {
828 LOG.debug("Container Status: id=" + containerId + ", status=" +
829 containerStatus);
830 }
831 }
832
833 @Override
834 public void onContainerStarted(ContainerId containerId,
835 Map<String, ByteBuffer> allServiceResponse) {
836 if (LOG.isDebugEnabled()) {
837 LOG.debug("Succeeded to start Container " + containerId);
838 }
839 Container container = containers.get(containerId);
840 if (container != null) {
841 applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
842 }
843 try {
844 ApplicationMaster.publishContainerStartEvent(
845 applicationMaster.timelineClient, container);
846 } catch (Exception e) {
847 LOG.error("Container start event coud not be pulished for "
848 + container.getId().toString(), e);
849 }
850 }
851
852 @Override
853 public void onStartContainerError(ContainerId containerId, Throwable t) {
854 LOG.error("Failed to start Container " + containerId);
855 containers.remove(containerId);
856 applicationMaster.numCompletedContainers.incrementAndGet();
857 applicationMaster.numFailedContainers.incrementAndGet();
858 }
859
860 @Override
861 public void onGetContainerStatusError(
862 ContainerId containerId, Throwable t) {
863 LOG.error("Failed to query the status of Container " + containerId);
864 }
865
866 @Override
867 public void onStopContainerError(ContainerId containerId, Throwable t) {
868 LOG.error("Failed to stop Container " + containerId);
869 containers.remove(containerId);
870 }
871 }
872
873 /**
874 * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
875 * that will execute the shell command.
876 */
877 private class LaunchContainerRunnable implements Runnable {
878
879 // Allocated container
880 Container container;
881
882 NMCallbackHandler containerListener;
883
884 /**
885 * @param lcontainer Allocated container
886 * @param containerListener Callback handler of the container
887 */
888 public LaunchContainerRunnable(
889 Container lcontainer, NMCallbackHandler containerListener) {
890 this.container = lcontainer;
891 this.containerListener = containerListener;
892 }
893
894 @Override
895 /**
896 * Connects to CM, sets up container launch context
897 * for shell command and eventually dispatches the container
898 * start request to the CM.
899 */
900 public void run() {
901 LOG.info("Setting up container launch container for containerid="
902 + container.getId());
903 ContainerLaunchContext ctx = Records
904 .newRecord(ContainerLaunchContext.class);
905
906 // Set the environment
907 ctx.setEnvironment(shellEnv);
908
909 // Set the local resources
910 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
911
912 // The container for the eventual shell commands needs its own local
913 // resources too.
914 // In this scenario, if a shell script is specified, we need to have it
915 // copied and made available to the container.
916 if (!scriptPath.isEmpty()) {
917 Path renamedScriptPath = null;
918 if (Shell.WINDOWS) {
919 renamedScriptPath = new Path(scriptPath + ".bat");
920 } else {
921 renamedScriptPath = new Path(scriptPath + ".sh");
922 }
923
924 try {
925 // rename the script file based on the underlying OS syntax.
926 renameScriptFile(renamedScriptPath);
927 } catch (Exception e) {
928 LOG.error(
929 "Not able to add suffix (.bat/.sh) to the shell script filename",
930 e);
931 // We know we cannot continue launching the container
932 // so we should release it.
933 numCompletedContainers.incrementAndGet();
934 numFailedContainers.incrementAndGet();
935 return;
936 }
937
938 LocalResource shellRsrc = Records.newRecord(LocalResource.class);
939 shellRsrc.setType(LocalResourceType.FILE);
940 shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
941 try {
942 shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
943 renamedScriptPath.toString())));
944 } catch (URISyntaxException e) {
945 LOG.error("Error when trying to use shell script path specified"
946 + " in env, path=" + renamedScriptPath, e);
947
948 // A failure scenario on bad input such as invalid shell script path
949 // We know we cannot continue launching the container
950 // so we should release it.
951 // TODO
952 numCompletedContainers.incrementAndGet();
953 numFailedContainers.incrementAndGet();
954 return;
955 }
956 shellRsrc.setTimestamp(shellScriptPathTimestamp);
957 shellRsrc.setSize(shellScriptPathLen);
958 localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
959 ExecShellStringPath, shellRsrc);
960 shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
961 }
962 ctx.setLocalResources(localResources);
963
964 // Set the necessary command to execute on the allocated container
965 Vector<CharSequence> vargs = new Vector<CharSequence>(5);
966
967 // Set executable command
968 vargs.add(shellCommand);
969 // Set shell script path
970 if (!scriptPath.isEmpty()) {
971 vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
972 : ExecShellStringPath);
973 }
974
975 // Set args for the shell command if any
976 vargs.add(shellArgs);
977 // Add log redirect params
978 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
979 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
980
981 // Get final commmand
982 StringBuilder command = new StringBuilder();
983 for (CharSequence str : vargs) {
984 command.append(str).append(" ");
985 }
986
987 List<String> commands = new ArrayList<String>();
988 commands.add(command.toString());
989 ctx.setCommands(commands);
990
991 // Set up tokens for the container too. Today, for normal shell commands,
992 // the container in distribute-shell doesn't need any tokens. We are
993 // populating them mainly for NodeManagers to be able to download any
994 // files in the distributed file-system. The tokens are otherwise also
995 // useful in cases, for e.g., when one is running a "hadoop dfs" command
996 // inside the distributed shell.
997 ctx.setTokens(allTokens.duplicate());
998
999 containerListener.addContainer(container.getId(), container);
1000 nmClientAsync.startContainerAsync(container, ctx);
1001 }
1002 }
1003
1004 private void renameScriptFile(final Path renamedScriptPath)
1005 throws IOException, InterruptedException {
1006 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
1007 @Override
1008 public Void run() throws IOException {
1009 FileSystem fs = renamedScriptPath.getFileSystem(conf);
1010 fs.rename(new Path(scriptPath), renamedScriptPath);
1011 return null;
1012 }
1013 });
1014 LOG.info("User " + appSubmitterUgi.getUserName()
1015 + " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
1016 }
1017
1018 /**
1019 * Setup the request that will be sent to the RM for the container ask.
1020 *
1021 * @return the setup ResourceRequest to be sent to RM
1022 */
1023 private ContainerRequest setupContainerAskForRM() {
1024 // setup requirements for hosts
1025 // using * as any host will do for the distributed shell app
1026 // set the priority for the request
1027 Priority pri = Records.newRecord(Priority.class);
1028 // TODO - what is the range for priority? how to decide?
1029 pri.setPriority(requestPriority);
1030
1031 // Set up resource type requirements
1032 // For now, memory and CPU are supported so we set memory and cpu requirements
1033 Resource capability = Records.newRecord(Resource.class);
1034 capability.setMemory(containerMemory);
1035 capability.setVirtualCores(containerVirtualCores);
1036
1037 ContainerRequest request = new ContainerRequest(capability, null, null,
1038 pri);
1039 LOG.info("Requested container ask: " + request.toString());
1040 return request;
1041 }
1042
1043 private boolean fileExist(String filePath) {
1044 return new File(filePath).exists();
1045 }
1046
1047 private String readContent(String filePath) throws IOException {
1048 DataInputStream ds = null;
1049 try {
1050 ds = new DataInputStream(new FileInputStream(filePath));
1051 return ds.readUTF();
1052 } finally {
1053 org.apache.commons.io.IOUtils.closeQuietly(ds);
1054 }
1055 }
1056
1057 private static void publishContainerStartEvent(TimelineClient timelineClient,
1058 Container container) throws IOException, YarnException {
1059 TimelineEntity entity = new TimelineEntity();
1060 entity.setEntityId(container.getId().toString());
1061 entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1062 entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
1063 .toString());
1064 TimelineEvent event = new TimelineEvent();
1065 event.setTimestamp(System.currentTimeMillis());
1066 event.setEventType(DSEvent.DS_CONTAINER_START.toString());
1067 event.addEventInfo("Node", container.getNodeId().toString());
1068 event.addEventInfo("Resources", container.getResource().toString());
1069 entity.addEvent(event);
1070
1071 timelineClient.putEntities(entity);
1072 }
1073
1074 private static void publishContainerEndEvent(TimelineClient timelineClient,
1075 ContainerStatus container) throws IOException, YarnException {
1076 TimelineEntity entity = new TimelineEntity();
1077 entity.setEntityId(container.getContainerId().toString());
1078 entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1079 entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
1080 .toString());
1081 TimelineEvent event = new TimelineEvent();
1082 event.setTimestamp(System.currentTimeMillis());
1083 event.setEventType(DSEvent.DS_CONTAINER_END.toString());
1084 event.addEventInfo("State", container.getState().name());
1085 event.addEventInfo("Exit Status", container.getExitStatus());
1086 entity.addEvent(event);
1087
1088 timelineClient.putEntities(entity);
1089 }
1090
1091 private static void publishApplicationAttemptEvent(
1092 TimelineClient timelineClient, String appAttemptId, DSEvent appEvent)
1093 throws IOException, YarnException {
1094 TimelineEntity entity = new TimelineEntity();
1095 entity.setEntityId(appAttemptId);
1096 entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
1097 entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
1098 .toString());
1099 TimelineEvent event = new TimelineEvent();
1100 event.setEventType(appEvent.toString());
1101 event.setTimestamp(System.currentTimeMillis());
1102 entity.addEvent(event);
1103
1104 timelineClient.putEntities(entity);
1105 }
1106 }