001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.applications.distributedshell;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.HashMap;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Vector;
027
028 import org.apache.commons.cli.CommandLine;
029 import org.apache.commons.cli.GnuParser;
030 import org.apache.commons.cli.HelpFormatter;
031 import org.apache.commons.cli.Options;
032 import org.apache.commons.cli.ParseException;
033 import org.apache.commons.logging.Log;
034 import org.apache.commons.logging.LogFactory;
035 import org.apache.hadoop.classification.InterfaceAudience;
036 import org.apache.hadoop.classification.InterfaceStability;
037 import org.apache.hadoop.conf.Configuration;
038 import org.apache.hadoop.fs.FileStatus;
039 import org.apache.hadoop.fs.FileSystem;
040 import org.apache.hadoop.fs.Path;
041 import org.apache.hadoop.yarn.api.ApplicationConstants;
042 import org.apache.hadoop.yarn.api.ClientRMProtocol;
043 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
044 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
045 import org.apache.hadoop.yarn.api.records.ApplicationId;
046 import org.apache.hadoop.yarn.api.records.ApplicationReport;
047 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
048 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
049 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
050 import org.apache.hadoop.yarn.api.records.LocalResource;
051 import org.apache.hadoop.yarn.api.records.LocalResourceType;
052 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
053 import org.apache.hadoop.yarn.api.records.NodeReport;
054 import org.apache.hadoop.yarn.api.records.Priority;
055 import org.apache.hadoop.yarn.api.records.QueueACL;
056 import org.apache.hadoop.yarn.api.records.QueueInfo;
057 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
058 import org.apache.hadoop.yarn.api.records.Resource;
059 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
060 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
061 import org.apache.hadoop.yarn.client.YarnClientImpl;
062 import org.apache.hadoop.yarn.conf.YarnConfiguration;
063 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
064 import org.apache.hadoop.yarn.util.ConverterUtils;
065 import org.apache.hadoop.yarn.util.Records;
066
067 /**
068 * Client for Distributed Shell application submission to YARN.
069 *
070 * <p> The distributed shell client allows an application master to be launched that in turn would run
071 * the provided shell command on a set of containers. </p>
072 *
073 * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
074 *
075 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code>
076 * aka ApplicationsManager or ASM via the {@link ClientRMProtocol}. The {@link ClientRMProtocol}
077 * provides a way for the client to get access to cluster information and to request for a
078 * new {@link ApplicationId}. <p>
079 *
080 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}.
081 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId}
082 * and application name, the priority assigned to the application and the queue
083 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
084 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which
085 * the {@link ApplicationMaster} is launched. </p>
086 *
087 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the
088 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available
089 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the
090 * {@link ApplicationMaster}. <p>
091 *
092 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the
093 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code>
094 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client
095 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
096 *
097 */
098 @InterfaceAudience.Public
099 @InterfaceStability.Unstable
100 public class Client extends YarnClientImpl {
101
102 private static final Log LOG = LogFactory.getLog(Client.class);
103
104 // Configuration
105 private Configuration conf;
106
107 // Application master specific info to register a new Application with RM/ASM
108 private String appName = "";
109 // App master priority
110 private int amPriority = 0;
111 // Queue for App master
112 private String amQueue = "";
113 // Amt. of memory resource to request for to run the App Master
114 private int amMemory = 10;
115
116 // Application master jar file
117 private String appMasterJar = "";
118 // Main class to invoke application master
119 private final String appMasterMainClass =
120 "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster";
121
122 // Shell command to be executed
123 private String shellCommand = "";
124 // Location of shell script
125 private String shellScriptPath = "";
126 // Args to be passed to the shell command
127 private String shellArgs = "";
128 // Env variables to be setup for the shell command
129 private Map<String, String> shellEnv = new HashMap<String, String>();
130 // Shell Command Container priority
131 private int shellCmdPriority = 0;
132
133 // Amt of memory to request for container in which shell script will be executed
134 private int containerMemory = 10;
135 // No. of containers in which the shell script needs to be executed
136 private int numContainers = 1;
137
138 // log4j.properties file
139 // if available, add to local resources and set into classpath
140 private String log4jPropFile = "";
141
142 // Start time for client
143 private final long clientStartTime = System.currentTimeMillis();
144 // Timeout threshold for client. Kill app after time interval expires.
145 private long clientTimeout = 600000;
146
147 // Debug flag
148 boolean debugFlag = false;
149
150 // Command line options
151 private Options opts;
152
153 /**
154 * @param args Command line arguments
155 */
156 public static void main(String[] args) {
157 boolean result = false;
158 try {
159 Client client = new Client();
160 LOG.info("Initializing Client");
161 try {
162 boolean doRun = client.init(args);
163 if (!doRun) {
164 System.exit(0);
165 }
166 } catch (IllegalArgumentException e) {
167 System.err.println(e.getLocalizedMessage());
168 client.printUsage();
169 System.exit(-1);
170 }
171 result = client.run();
172 } catch (Throwable t) {
173 LOG.fatal("Error running CLient", t);
174 System.exit(1);
175 }
176 if (result) {
177 LOG.info("Application completed successfully");
178 System.exit(0);
179 }
180 LOG.error("Application failed to complete successfully");
181 System.exit(2);
182 }
183
184 /**
185 */
186 public Client(Configuration conf) throws Exception {
187 super();
188 this.conf = conf;
189 init(conf);
190 opts = new Options();
191 opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
192 opts.addOption("priority", true, "Application Priority. Default 0");
193 opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
194 opts.addOption("timeout", true, "Application timeout in milliseconds");
195 opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
196 opts.addOption("jar", true, "Jar file containing the application master");
197 opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
198 opts.addOption("shell_script", true, "Location of the shell script to be executed");
199 opts.addOption("shell_args", true, "Command line args for the shell script");
200 opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
201 opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
202 opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
203 opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
204 opts.addOption("log_properties", true, "log4j.properties file");
205 opts.addOption("debug", false, "Dump out debug information");
206 opts.addOption("help", false, "Print usage");
207 }
208
209 /**
210 */
211 public Client() throws Exception {
212 this(new YarnConfiguration());
213 }
214
215 /**
216 * Helper function to print out usage
217 * @param opts Parsed command line options
218 */
219 private void printUsage() {
220 new HelpFormatter().printHelp("Client", opts);
221 }
222
223 /**
224 * Parse command line options
225 * @param args Parsed command line options
226 * @return Whether the init was successful to run the client
227 * @throws ParseException
228 */
229 public boolean init(String[] args) throws ParseException {
230
231 CommandLine cliParser = new GnuParser().parse(opts, args);
232
233 if (args.length == 0) {
234 throw new IllegalArgumentException("No args specified for client to initialize");
235 }
236
237 if (cliParser.hasOption("help")) {
238 printUsage();
239 return false;
240 }
241
242 if (cliParser.hasOption("debug")) {
243 debugFlag = true;
244
245 }
246
247 appName = cliParser.getOptionValue("appname", "DistributedShell");
248 amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
249 amQueue = cliParser.getOptionValue("queue", "default");
250 amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
251
252 if (amMemory < 0) {
253 throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
254 + " Specified memory=" + amMemory);
255 }
256
257 if (!cliParser.hasOption("jar")) {
258 throw new IllegalArgumentException("No jar file specified for application master");
259 }
260
261 appMasterJar = cliParser.getOptionValue("jar");
262
263 if (!cliParser.hasOption("shell_command")) {
264 throw new IllegalArgumentException("No shell command specified to be executed by application master");
265 }
266 shellCommand = cliParser.getOptionValue("shell_command");
267
268 if (cliParser.hasOption("shell_script")) {
269 shellScriptPath = cliParser.getOptionValue("shell_script");
270 }
271 if (cliParser.hasOption("shell_args")) {
272 shellArgs = cliParser.getOptionValue("shell_args");
273 }
274 if (cliParser.hasOption("shell_env")) {
275 String envs[] = cliParser.getOptionValues("shell_env");
276 for (String env : envs) {
277 env = env.trim();
278 int index = env.indexOf('=');
279 if (index == -1) {
280 shellEnv.put(env, "");
281 continue;
282 }
283 String key = env.substring(0, index);
284 String val = "";
285 if (index < (env.length()-1)) {
286 val = env.substring(index+1);
287 }
288 shellEnv.put(key, val);
289 }
290 }
291 shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
292
293 containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
294 numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
295
296 if (containerMemory < 0 || numContainers < 1) {
297 throw new IllegalArgumentException("Invalid no. of containers or container memory specified, exiting."
298 + " Specified containerMemory=" + containerMemory
299 + ", numContainer=" + numContainers);
300 }
301
302 clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
303
304 log4jPropFile = cliParser.getOptionValue("log_properties", "");
305
306 return true;
307 }
308
309 /**
310 * Main run function for the client
311 * @return true if application completed successfully
312 * @throws IOException
313 */
314 public boolean run() throws IOException {
315
316 LOG.info("Running Client");
317 start();
318
319 YarnClusterMetrics clusterMetrics = super.getYarnClusterMetrics();
320 LOG.info("Got Cluster metric info from ASM"
321 + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
322
323 List<NodeReport> clusterNodeReports = super.getNodeReports();
324 LOG.info("Got Cluster node info from ASM");
325 for (NodeReport node : clusterNodeReports) {
326 LOG.info("Got node report from ASM for"
327 + ", nodeId=" + node.getNodeId()
328 + ", nodeAddress" + node.getHttpAddress()
329 + ", nodeRackName" + node.getRackName()
330 + ", nodeNumContainers" + node.getNumContainers()
331 + ", nodeHealthStatus" + node.getNodeHealthStatus());
332 }
333
334 QueueInfo queueInfo = super.getQueueInfo(this.amQueue);
335 LOG.info("Queue info"
336 + ", queueName=" + queueInfo.getQueueName()
337 + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
338 + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
339 + ", queueApplicationCount=" + queueInfo.getApplications().size()
340 + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
341
342 List<QueueUserACLInfo> listAclInfo = super.getQueueAclsInfo();
343 for (QueueUserACLInfo aclInfo : listAclInfo) {
344 for (QueueACL userAcl : aclInfo.getUserAcls()) {
345 LOG.info("User ACL Info for Queue"
346 + ", queueName=" + aclInfo.getQueueName()
347 + ", userAcl=" + userAcl.name());
348 }
349 }
350
351 // Get a new application id
352 GetNewApplicationResponse newApp = super.getNewApplication();
353 ApplicationId appId = newApp.getApplicationId();
354
355 // TODO get min/max resource capabilities from RM and change memory ask if needed
356 // If we do not have min/max, we may not be able to correctly request
357 // the required resources from the RM for the app master
358 // Memory ask has to be a multiple of min and less than max.
359 // Dump out information about cluster capability as seen by the resource manager
360 int minMem = newApp.getMinimumResourceCapability().getMemory();
361 int maxMem = newApp.getMaximumResourceCapability().getMemory();
362 LOG.info("Min mem capabililty of resources in this cluster " + minMem);
363 LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
364
365 // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
366 // a multiple of the min value and cannot exceed the max.
367 // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
368 if (amMemory < minMem) {
369 LOG.info("AM memory specified below min threshold of cluster. Using min value."
370 + ", specified=" + amMemory
371 + ", min=" + minMem);
372 amMemory = minMem;
373 }
374 else if (amMemory > maxMem) {
375 LOG.info("AM memory specified above max threshold of cluster. Using max value."
376 + ", specified=" + amMemory
377 + ", max=" + maxMem);
378 amMemory = maxMem;
379 }
380
381 // Create launch context for app master
382 LOG.info("Setting up application submission context for ASM");
383 ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
384
385 // set the application id
386 appContext.setApplicationId(appId);
387 // set the application name
388 appContext.setApplicationName(appName);
389
390 // Set up the container launch context for the application master
391 ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
392
393 // set local resources for the application master
394 // local files or archives as needed
395 // In this scenario, the jar file for the application master is part of the local resources
396 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
397
398 LOG.info("Copy App Master jar from local filesystem and add to local environment");
399 // Copy the application master jar to the filesystem
400 // Create a local resource to point to the destination jar path
401 FileSystem fs = FileSystem.get(conf);
402 Path src = new Path(appMasterJar);
403 String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";
404 Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
405 fs.copyFromLocalFile(false, true, src, dst);
406 FileStatus destStatus = fs.getFileStatus(dst);
407 LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
408
409 // Set the type of resource - file or archive
410 // archives are untarred at destination
411 // we don't need the jar file to be untarred for now
412 amJarRsrc.setType(LocalResourceType.FILE);
413 // Set visibility of the resource
414 // Setting to most private option
415 amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
416 // Set the resource to be copied over
417 amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
418 // Set timestamp and length of file so that the framework
419 // can do basic sanity checks for the local resource
420 // after it has been copied over to ensure it is the same
421 // resource the client intended to use with the application
422 amJarRsrc.setTimestamp(destStatus.getModificationTime());
423 amJarRsrc.setSize(destStatus.getLen());
424 localResources.put("AppMaster.jar", amJarRsrc);
425
426 // Set the log4j properties if needed
427 if (!log4jPropFile.isEmpty()) {
428 Path log4jSrc = new Path(log4jPropFile);
429 Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
430 fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
431 FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
432 LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
433 log4jRsrc.setType(LocalResourceType.FILE);
434 log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
435 log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
436 log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
437 log4jRsrc.setSize(log4jFileStatus.getLen());
438 localResources.put("log4j.properties", log4jRsrc);
439 }
440
441 // The shell script has to be made available on the final container(s)
442 // where it will be executed.
443 // To do this, we need to first copy into the filesystem that is visible
444 // to the yarn framework.
445 // We do not need to set this as a local resource for the application
446 // master as the application master does not need it.
447 String hdfsShellScriptLocation = "";
448 long hdfsShellScriptLen = 0;
449 long hdfsShellScriptTimestamp = 0;
450 if (!shellScriptPath.isEmpty()) {
451 Path shellSrc = new Path(shellScriptPath);
452 String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh";
453 Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix);
454 fs.copyFromLocalFile(false, true, shellSrc, shellDst);
455 hdfsShellScriptLocation = shellDst.toUri().toString();
456 FileStatus shellFileStatus = fs.getFileStatus(shellDst);
457 hdfsShellScriptLen = shellFileStatus.getLen();
458 hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
459 }
460
461 // Set local resource info into app master container launch context
462 amContainer.setLocalResources(localResources);
463
464 // Set the necessary security tokens as needed
465 //amContainer.setContainerTokens(containerToken);
466
467 // Set the env variables to be setup in the env where the application master will be run
468 LOG.info("Set the environment for the application master");
469 Map<String, String> env = new HashMap<String, String>();
470
471 // put location of shell script into env
472 // using the env info, the application master will create the correct local resource for the
473 // eventual containers that will be launched to execute the shell scripts
474 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
475 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
476 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
477
478 // Add AppMaster.jar location to classpath
479 // At some point we should not be required to add
480 // the hadoop specific classpaths to the env.
481 // It should be provided out of the box.
482 // For now setting all required classpaths including
483 // the classpath to "." for the application jar
484 StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
485 for (String c : conf.getStrings(
486 YarnConfiguration.YARN_APPLICATION_CLASSPATH,
487 YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
488 classPathEnv.append(':');
489 classPathEnv.append(c.trim());
490 }
491 classPathEnv.append(":./log4j.properties");
492
493 // add the runtime classpath needed for tests to work
494 if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
495 classPathEnv.append(':');
496 classPathEnv.append(System.getProperty("java.class.path"));
497 }
498
499 env.put("CLASSPATH", classPathEnv.toString());
500
501 amContainer.setEnvironment(env);
502
503 // Set the necessary command to execute the application master
504 Vector<CharSequence> vargs = new Vector<CharSequence>(30);
505
506 // Set java executable command
507 LOG.info("Setting up app master command");
508 vargs.add("${JAVA_HOME}" + "/bin/java");
509 // Set Xmx based on am memory size
510 vargs.add("-Xmx" + amMemory + "m");
511 // Set class name
512 vargs.add(appMasterMainClass);
513 // Set params for Application Master
514 vargs.add("--container_memory " + String.valueOf(containerMemory));
515 vargs.add("--num_containers " + String.valueOf(numContainers));
516 vargs.add("--priority " + String.valueOf(shellCmdPriority));
517 if (!shellCommand.isEmpty()) {
518 vargs.add("--shell_command " + shellCommand + "");
519 }
520 if (!shellArgs.isEmpty()) {
521 vargs.add("--shell_args " + shellArgs + "");
522 }
523 for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
524 vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
525 }
526 if (debugFlag) {
527 vargs.add("--debug");
528 }
529
530 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
531 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
532
533 // Get final commmand
534 StringBuilder command = new StringBuilder();
535 for (CharSequence str : vargs) {
536 command.append(str).append(" ");
537 }
538
539 LOG.info("Completed setting up app master command " + command.toString());
540 List<String> commands = new ArrayList<String>();
541 commands.add(command.toString());
542 amContainer.setCommands(commands);
543
544 // Set up resource type requirements
545 // For now, only memory is supported so we set memory requirements
546 Resource capability = Records.newRecord(Resource.class);
547 capability.setMemory(amMemory);
548 amContainer.setResource(capability);
549
550 // Service data is a binary blob that can be passed to the application
551 // Not needed in this scenario
552 // amContainer.setServiceData(serviceData);
553
554 // The following are not required for launching an application master
555 // amContainer.setContainerId(containerId);
556
557 appContext.setAMContainerSpec(amContainer);
558
559 // Set the priority for the application master
560 Priority pri = Records.newRecord(Priority.class);
561 // TODO - what is the range for priority? how to decide?
562 pri.setPriority(amPriority);
563 appContext.setPriority(pri);
564
565 // Set the queue to which this application is to be submitted in the RM
566 appContext.setQueue(amQueue);
567
568 // Submit the application to the applications manager
569 // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
570 // Ignore the response as either a valid response object is returned on success
571 // or an exception thrown to denote some form of a failure
572 LOG.info("Submitting application to ASM");
573 super.submitApplication(appContext);
574
575 // TODO
576 // Try submitting the same request again
577 // app submission failure?
578
579 // Monitor the application
580 return monitorApplication(appId);
581
582 }
583
584 /**
585 * Monitor the submitted application for completion.
586 * Kill application if time expires.
587 * @param appId Application Id of application to be monitored
588 * @return true if application completed successfully
589 * @throws YarnRemoteException
590 */
591 private boolean monitorApplication(ApplicationId appId) throws YarnRemoteException {
592
593 while (true) {
594
595 // Check app status every 1 second.
596 try {
597 Thread.sleep(1000);
598 } catch (InterruptedException e) {
599 LOG.debug("Thread sleep in monitoring loop interrupted");
600 }
601
602 // Get application report for the appId we are interested in
603 ApplicationReport report = super.getApplicationReport(appId);
604
605 LOG.info("Got application report from ASM for"
606 + ", appId=" + appId.getId()
607 + ", clientToken=" + report.getClientToken()
608 + ", appDiagnostics=" + report.getDiagnostics()
609 + ", appMasterHost=" + report.getHost()
610 + ", appQueue=" + report.getQueue()
611 + ", appMasterRpcPort=" + report.getRpcPort()
612 + ", appStartTime=" + report.getStartTime()
613 + ", yarnAppState=" + report.getYarnApplicationState().toString()
614 + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
615 + ", appTrackingUrl=" + report.getTrackingUrl()
616 + ", appUser=" + report.getUser());
617
618 YarnApplicationState state = report.getYarnApplicationState();
619 FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
620 if (YarnApplicationState.FINISHED == state) {
621 if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
622 LOG.info("Application has completed successfully. Breaking monitoring loop");
623 return true;
624 }
625 else {
626 LOG.info("Application did finished unsuccessfully."
627 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
628 + ". Breaking monitoring loop");
629 return false;
630 }
631 }
632 else if (YarnApplicationState.KILLED == state
633 || YarnApplicationState.FAILED == state) {
634 LOG.info("Application did not finish."
635 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
636 + ". Breaking monitoring loop");
637 return false;
638 }
639
640 if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
641 LOG.info("Reached client specified timeout for application. Killing application");
642 forceKillApplication(appId);
643 return false;
644 }
645 }
646
647 }
648
649 /**
650 * Kill a submitted application by sending a call to the ASM
651 * @param appId Application Id to be killed.
652 * @throws YarnRemoteException
653 */
654 private void forceKillApplication(ApplicationId appId) throws YarnRemoteException {
655 // TODO clarify whether multiple jobs with the same app id can be submitted and be running at
656 // the same time.
657 // If yes, can we kill a particular attempt only?
658
659 // Response can be ignored as it is non-null on success or
660 // throws an exception in case of failures
661 super.killApplication(appId);
662 }
663
664 }