001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.applications.distributedshell;
020
021 import java.io.BufferedReader;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.InputStreamReader;
025 import java.util.ArrayList;
026 import java.util.HashMap;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Vector;
030
031 import org.apache.commons.cli.CommandLine;
032 import org.apache.commons.cli.GnuParser;
033 import org.apache.commons.cli.HelpFormatter;
034 import org.apache.commons.cli.Options;
035 import org.apache.commons.cli.ParseException;
036 import org.apache.commons.logging.Log;
037 import org.apache.commons.logging.LogFactory;
038 import org.apache.hadoop.classification.InterfaceAudience;
039 import org.apache.hadoop.classification.InterfaceStability;
040 import org.apache.hadoop.conf.Configuration;
041 import org.apache.hadoop.fs.FileStatus;
042 import org.apache.hadoop.fs.FileSystem;
043 import org.apache.hadoop.fs.Path;
044 import org.apache.hadoop.yarn.api.ApplicationConstants;
045 import org.apache.hadoop.yarn.api.ClientRMProtocol;
046 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
047 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
048 import org.apache.hadoop.yarn.api.records.ApplicationId;
049 import org.apache.hadoop.yarn.api.records.ApplicationReport;
050 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
051 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
052 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
053 import org.apache.hadoop.yarn.api.records.LocalResource;
054 import org.apache.hadoop.yarn.api.records.LocalResourceType;
055 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
056 import org.apache.hadoop.yarn.api.records.NodeReport;
057 import org.apache.hadoop.yarn.api.records.Priority;
058 import org.apache.hadoop.yarn.api.records.QueueACL;
059 import org.apache.hadoop.yarn.api.records.QueueInfo;
060 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
061 import org.apache.hadoop.yarn.api.records.Resource;
062 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
063 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
064 import org.apache.hadoop.yarn.client.YarnClientImpl;
065 import org.apache.hadoop.yarn.conf.YarnConfiguration;
066 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
067 import org.apache.hadoop.yarn.util.ConverterUtils;
068 import org.apache.hadoop.yarn.util.Records;
069
070 /**
071 * Client for Distributed Shell application submission to YARN.
072 *
073 * <p> The distributed shell client allows an application master to be launched that in turn would run
074 * the provided shell command on a set of containers. </p>
075 *
076 * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
077 *
078 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code>
079 * aka ApplicationsManager or ASM via the {@link ClientRMProtocol}. The {@link ClientRMProtocol}
080 * provides a way for the client to get access to cluster information and to request for a
081 * new {@link ApplicationId}. <p>
082 *
083 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}.
084 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId}
085 * and application name, the priority assigned to the application and the queue
086 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
087 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which
088 * the {@link ApplicationMaster} is launched. </p>
089 *
090 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the
091 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available
092 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the
093 * {@link ApplicationMaster}. <p>
094 *
095 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the
096 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code>
097 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client
098 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
099 *
100 */
101 @InterfaceAudience.Public
102 @InterfaceStability.Unstable
103 public class Client extends YarnClientImpl {
104
105 private static final Log LOG = LogFactory.getLog(Client.class);
106
107 // Configuration
108 private Configuration conf;
109
110 // Application master specific info to register a new Application with RM/ASM
111 private String appName = "";
112 // App master priority
113 private int amPriority = 0;
114 // Queue for App master
115 private String amQueue = "";
116 // Amt. of memory resource to request for to run the App Master
117 private int amMemory = 10;
118
119 // Application master jar file
120 private String appMasterJar = "";
121 // Main class to invoke application master
122 private String appMasterMainClass = "";
123
124 // Shell command to be executed
125 private String shellCommand = "";
126 // Location of shell script
127 private String shellScriptPath = "";
128 // Args to be passed to the shell command
129 private String shellArgs = "";
130 // Env variables to be setup for the shell command
131 private Map<String, String> shellEnv = new HashMap<String, String>();
132 // Shell Command Container priority
133 private int shellCmdPriority = 0;
134
135 // Amt of memory to request for container in which shell script will be executed
136 private int containerMemory = 10;
137 // No. of containers in which the shell script needs to be executed
138 private int numContainers = 1;
139
140 // log4j.properties file
141 // if available, add to local resources and set into classpath
142 private String log4jPropFile = "";
143
144 // Start time for client
145 private final long clientStartTime = System.currentTimeMillis();
146 // Timeout threshold for client. Kill app after time interval expires.
147 private long clientTimeout = 600000;
148
149 // Debug flag
150 boolean debugFlag = false;
151
152 /**
153 * @param args Command line arguments
154 */
155 public static void main(String[] args) {
156 boolean result = false;
157 try {
158 Client client = new Client();
159 LOG.info("Initializing Client");
160 boolean doRun = client.init(args);
161 if (!doRun) {
162 System.exit(0);
163 }
164 result = client.run();
165 } catch (Throwable t) {
166 LOG.fatal("Error running CLient", t);
167 System.exit(1);
168 }
169 if (result) {
170 LOG.info("Application completed successfully");
171 System.exit(0);
172 }
173 LOG.error("Application failed to complete successfully");
174 System.exit(2);
175 }
176
177 /**
178 */
179 public Client(Configuration conf) throws Exception {
180 super();
181 this.conf = conf;
182 init(conf);
183 }
184
185 /**
186 */
187 public Client() throws Exception {
188 this(new Configuration());
189 }
190
191 /**
192 * Helper function to print out usage
193 * @param opts Parsed command line options
194 */
195 private void printUsage(Options opts) {
196 new HelpFormatter().printHelp("Client", opts);
197 }
198
199 /**
200 * Parse command line options
201 * @param args Parsed command line options
202 * @return Whether the init was successful to run the client
203 * @throws ParseException
204 */
205 public boolean init(String[] args) throws ParseException {
206
207 Options opts = new Options();
208 opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
209 opts.addOption("priority", true, "Application Priority. Default 0");
210 opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
211 opts.addOption("timeout", true, "Application timeout in milliseconds");
212 opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
213 opts.addOption("jar", true, "Jar file containing the application master");
214 opts.addOption("class", true, "Main class to be run for the Application Master.");
215 opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
216 opts.addOption("shell_script", true, "Location of the shell script to be executed");
217 opts.addOption("shell_args", true, "Command line args for the shell script");
218 opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
219 opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
220 opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
221 opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
222 opts.addOption("log_properties", true, "log4j.properties file");
223 opts.addOption("debug", false, "Dump out debug information");
224 opts.addOption("help", false, "Print usage");
225 CommandLine cliParser = new GnuParser().parse(opts, args);
226
227 if (args.length == 0) {
228 printUsage(opts);
229 throw new IllegalArgumentException("No args specified for client to initialize");
230 }
231
232 if (cliParser.hasOption("help")) {
233 printUsage(opts);
234 return false;
235 }
236
237 if (cliParser.hasOption("debug")) {
238 debugFlag = true;
239
240 }
241
242 appName = cliParser.getOptionValue("appname", "DistributedShell");
243 amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
244 amQueue = cliParser.getOptionValue("queue", "default");
245 amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
246
247 if (amMemory < 0) {
248 throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
249 + " Specified memory=" + amMemory);
250 }
251
252 if (!cliParser.hasOption("jar")) {
253 throw new IllegalArgumentException("No jar file specified for application master");
254 }
255
256 appMasterJar = cliParser.getOptionValue("jar");
257 appMasterMainClass = cliParser.getOptionValue("class",
258 "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster");
259
260 if (!cliParser.hasOption("shell_command")) {
261 throw new IllegalArgumentException("No shell command specified to be executed by application master");
262 }
263 shellCommand = cliParser.getOptionValue("shell_command");
264
265 if (cliParser.hasOption("shell_script")) {
266 shellScriptPath = cliParser.getOptionValue("shell_script");
267 }
268 if (cliParser.hasOption("shell_args")) {
269 shellArgs = cliParser.getOptionValue("shell_args");
270 }
271 if (cliParser.hasOption("shell_env")) {
272 String envs[] = cliParser.getOptionValues("shell_env");
273 for (String env : envs) {
274 env = env.trim();
275 int index = env.indexOf('=');
276 if (index == -1) {
277 shellEnv.put(env, "");
278 continue;
279 }
280 String key = env.substring(0, index);
281 String val = "";
282 if (index < (env.length()-1)) {
283 val = env.substring(index+1);
284 }
285 shellEnv.put(key, val);
286 }
287 }
288 shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
289
290 containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
291 numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
292
293 if (containerMemory < 0 || numContainers < 1) {
294 throw new IllegalArgumentException("Invalid no. of containers or container memory specified, exiting."
295 + " Specified containerMemory=" + containerMemory
296 + ", numContainer=" + numContainers);
297 }
298
299 clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
300
301 log4jPropFile = cliParser.getOptionValue("log_properties", "");
302
303 return true;
304 }
305
306 /**
307 * Main run function for the client
308 * @return true if application completed successfully
309 * @throws IOException
310 */
311 public boolean run() throws IOException {
312
313 LOG.info("Running Client");
314 start();
315
316 YarnClusterMetrics clusterMetrics = super.getYarnClusterMetrics();
317 LOG.info("Got Cluster metric info from ASM"
318 + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
319
320 List<NodeReport> clusterNodeReports = super.getNodeReports();
321 LOG.info("Got Cluster node info from ASM");
322 for (NodeReport node : clusterNodeReports) {
323 LOG.info("Got node report from ASM for"
324 + ", nodeId=" + node.getNodeId()
325 + ", nodeAddress" + node.getHttpAddress()
326 + ", nodeRackName" + node.getRackName()
327 + ", nodeNumContainers" + node.getNumContainers()
328 + ", nodeHealthStatus" + node.getNodeHealthStatus());
329 }
330
331 QueueInfo queueInfo = super.getQueueInfo(this.amQueue);
332 LOG.info("Queue info"
333 + ", queueName=" + queueInfo.getQueueName()
334 + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
335 + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
336 + ", queueApplicationCount=" + queueInfo.getApplications().size()
337 + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
338
339 List<QueueUserACLInfo> listAclInfo = super.getQueueAclsInfo();
340 for (QueueUserACLInfo aclInfo : listAclInfo) {
341 for (QueueACL userAcl : aclInfo.getUserAcls()) {
342 LOG.info("User ACL Info for Queue"
343 + ", queueName=" + aclInfo.getQueueName()
344 + ", userAcl=" + userAcl.name());
345 }
346 }
347
348 // Get a new application id
349 GetNewApplicationResponse newApp = super.getNewApplication();
350 ApplicationId appId = newApp.getApplicationId();
351
352 // TODO get min/max resource capabilities from RM and change memory ask if needed
353 // If we do not have min/max, we may not be able to correctly request
354 // the required resources from the RM for the app master
355 // Memory ask has to be a multiple of min and less than max.
356 // Dump out information about cluster capability as seen by the resource manager
357 int minMem = newApp.getMinimumResourceCapability().getMemory();
358 int maxMem = newApp.getMaximumResourceCapability().getMemory();
359 LOG.info("Min mem capabililty of resources in this cluster " + minMem);
360 LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
361
362 // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
363 // a multiple of the min value and cannot exceed the max.
364 // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
365 if (amMemory < minMem) {
366 LOG.info("AM memory specified below min threshold of cluster. Using min value."
367 + ", specified=" + amMemory
368 + ", min=" + minMem);
369 amMemory = minMem;
370 }
371 else if (amMemory > maxMem) {
372 LOG.info("AM memory specified above max threshold of cluster. Using max value."
373 + ", specified=" + amMemory
374 + ", max=" + maxMem);
375 amMemory = maxMem;
376 }
377
378 // Create launch context for app master
379 LOG.info("Setting up application submission context for ASM");
380 ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
381
382 // set the application id
383 appContext.setApplicationId(appId);
384 // set the application name
385 appContext.setApplicationName(appName);
386
387 // Set up the container launch context for the application master
388 ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
389
390 // set local resources for the application master
391 // local files or archives as needed
392 // In this scenario, the jar file for the application master is part of the local resources
393 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
394
395 LOG.info("Copy App Master jar from local filesystem and add to local environment");
396 // Copy the application master jar to the filesystem
397 // Create a local resource to point to the destination jar path
398 FileSystem fs = FileSystem.get(conf);
399 Path src = new Path(appMasterJar);
400 String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";
401 Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
402 fs.copyFromLocalFile(false, true, src, dst);
403 FileStatus destStatus = fs.getFileStatus(dst);
404 LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
405
406 // Set the type of resource - file or archive
407 // archives are untarred at destination
408 // we don't need the jar file to be untarred for now
409 amJarRsrc.setType(LocalResourceType.FILE);
410 // Set visibility of the resource
411 // Setting to most private option
412 amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
413 // Set the resource to be copied over
414 amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
415 // Set timestamp and length of file so that the framework
416 // can do basic sanity checks for the local resource
417 // after it has been copied over to ensure it is the same
418 // resource the client intended to use with the application
419 amJarRsrc.setTimestamp(destStatus.getModificationTime());
420 amJarRsrc.setSize(destStatus.getLen());
421 localResources.put("AppMaster.jar", amJarRsrc);
422
423 // Set the log4j properties if needed
424 if (!log4jPropFile.isEmpty()) {
425 Path log4jSrc = new Path(log4jPropFile);
426 Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
427 fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
428 FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
429 LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
430 log4jRsrc.setType(LocalResourceType.FILE);
431 log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
432 log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
433 log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
434 log4jRsrc.setSize(log4jFileStatus.getLen());
435 localResources.put("log4j.properties", log4jRsrc);
436 }
437
438 // The shell script has to be made available on the final container(s)
439 // where it will be executed.
440 // To do this, we need to first copy into the filesystem that is visible
441 // to the yarn framework.
442 // We do not need to set this as a local resource for the application
443 // master as the application master does not need it.
444 String hdfsShellScriptLocation = "";
445 long hdfsShellScriptLen = 0;
446 long hdfsShellScriptTimestamp = 0;
447 if (!shellScriptPath.isEmpty()) {
448 Path shellSrc = new Path(shellScriptPath);
449 String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh";
450 Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix);
451 fs.copyFromLocalFile(false, true, shellSrc, shellDst);
452 hdfsShellScriptLocation = shellDst.toUri().toString();
453 FileStatus shellFileStatus = fs.getFileStatus(shellDst);
454 hdfsShellScriptLen = shellFileStatus.getLen();
455 hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
456 }
457
458 // Set local resource info into app master container launch context
459 amContainer.setLocalResources(localResources);
460
461 // Set the necessary security tokens as needed
462 //amContainer.setContainerTokens(containerToken);
463
464 // Set the env variables to be setup in the env where the application master will be run
465 LOG.info("Set the environment for the application master");
466 Map<String, String> env = new HashMap<String, String>();
467
468 // put location of shell script into env
469 // using the env info, the application master will create the correct local resource for the
470 // eventual containers that will be launched to execute the shell scripts
471 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
472 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
473 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
474
475 // Add AppMaster.jar location to classpath
476 // At some point we should not be required to add
477 // the hadoop specific classpaths to the env.
478 // It should be provided out of the box.
479 // For now setting all required classpaths including
480 // the classpath to "." for the application jar
481 StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
482 for (String c : conf.getStrings(
483 YarnConfiguration.YARN_APPLICATION_CLASSPATH,
484 YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
485 classPathEnv.append(':');
486 classPathEnv.append(c.trim());
487 }
488 classPathEnv.append(":./log4j.properties");
489
490 // add the runtime classpath needed for tests to work
491 String testRuntimeClassPath = Client.getTestRuntimeClasspath();
492 classPathEnv.append(':');
493 classPathEnv.append(testRuntimeClassPath);
494
495 env.put("CLASSPATH", classPathEnv.toString());
496
497 amContainer.setEnvironment(env);
498
499 // Set the necessary command to execute the application master
500 Vector<CharSequence> vargs = new Vector<CharSequence>(30);
501
502 // Set java executable command
503 LOG.info("Setting up app master command");
504 vargs.add("${JAVA_HOME}" + "/bin/java");
505 // Set Xmx based on am memory size
506 vargs.add("-Xmx" + amMemory + "m");
507 // Set class name
508 vargs.add(appMasterMainClass);
509 // Set params for Application Master
510 vargs.add("--container_memory " + String.valueOf(containerMemory));
511 vargs.add("--num_containers " + String.valueOf(numContainers));
512 vargs.add("--priority " + String.valueOf(shellCmdPriority));
513 if (!shellCommand.isEmpty()) {
514 vargs.add("--shell_command " + shellCommand + "");
515 }
516 if (!shellArgs.isEmpty()) {
517 vargs.add("--shell_args " + shellArgs + "");
518 }
519 for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
520 vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
521 }
522 if (debugFlag) {
523 vargs.add("--debug");
524 }
525
526 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
527 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
528
529 // Get final commmand
530 StringBuilder command = new StringBuilder();
531 for (CharSequence str : vargs) {
532 command.append(str).append(" ");
533 }
534
535 LOG.info("Completed setting up app master command " + command.toString());
536 List<String> commands = new ArrayList<String>();
537 commands.add(command.toString());
538 amContainer.setCommands(commands);
539
540 // Set up resource type requirements
541 // For now, only memory is supported so we set memory requirements
542 Resource capability = Records.newRecord(Resource.class);
543 capability.setMemory(amMemory);
544 amContainer.setResource(capability);
545
546 // Service data is a binary blob that can be passed to the application
547 // Not needed in this scenario
548 // amContainer.setServiceData(serviceData);
549
550 // The following are not required for launching an application master
551 // amContainer.setContainerId(containerId);
552
553 appContext.setAMContainerSpec(amContainer);
554
555 // Set the priority for the application master
556 Priority pri = Records.newRecord(Priority.class);
557 // TODO - what is the range for priority? how to decide?
558 pri.setPriority(amPriority);
559 appContext.setPriority(pri);
560
561 // Set the queue to which this application is to be submitted in the RM
562 appContext.setQueue(amQueue);
563
564 // Submit the application to the applications manager
565 // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
566 // Ignore the response as either a valid response object is returned on success
567 // or an exception thrown to denote some form of a failure
568 LOG.info("Submitting application to ASM");
569 super.submitApplication(appContext);
570
571 // TODO
572 // Try submitting the same request again
573 // app submission failure?
574
575 // Monitor the application
576 return monitorApplication(appId);
577
578 }
579
580 /**
581 * Monitor the submitted application for completion.
582 * Kill application if time expires.
583 * @param appId Application Id of application to be monitored
584 * @return true if application completed successfully
585 * @throws YarnRemoteException
586 */
587 private boolean monitorApplication(ApplicationId appId) throws YarnRemoteException {
588
589 while (true) {
590
591 // Check app status every 1 second.
592 try {
593 Thread.sleep(1000);
594 } catch (InterruptedException e) {
595 LOG.debug("Thread sleep in monitoring loop interrupted");
596 }
597
598 // Get application report for the appId we are interested in
599 ApplicationReport report = super.getApplicationReport(appId);
600
601 LOG.info("Got application report from ASM for"
602 + ", appId=" + appId.getId()
603 + ", clientToken=" + report.getClientToken()
604 + ", appDiagnostics=" + report.getDiagnostics()
605 + ", appMasterHost=" + report.getHost()
606 + ", appQueue=" + report.getQueue()
607 + ", appMasterRpcPort=" + report.getRpcPort()
608 + ", appStartTime=" + report.getStartTime()
609 + ", yarnAppState=" + report.getYarnApplicationState().toString()
610 + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
611 + ", appTrackingUrl=" + report.getTrackingUrl()
612 + ", appUser=" + report.getUser());
613
614 YarnApplicationState state = report.getYarnApplicationState();
615 FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
616 if (YarnApplicationState.FINISHED == state) {
617 if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
618 LOG.info("Application has completed successfully. Breaking monitoring loop");
619 return true;
620 }
621 else {
622 LOG.info("Application did finished unsuccessfully."
623 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
624 + ". Breaking monitoring loop");
625 return false;
626 }
627 }
628 else if (YarnApplicationState.KILLED == state
629 || YarnApplicationState.FAILED == state) {
630 LOG.info("Application did not finish."
631 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
632 + ". Breaking monitoring loop");
633 return false;
634 }
635
636 if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
637 LOG.info("Reached client specified timeout for application. Killing application");
638 forceKillApplication(appId);
639 return false;
640 }
641 }
642
643 }
644
645 /**
646 * Kill a submitted application by sending a call to the ASM
647 * @param appId Application Id to be killed.
648 * @throws YarnRemoteException
649 */
650 private void forceKillApplication(ApplicationId appId) throws YarnRemoteException {
651 // TODO clarify whether multiple jobs with the same app id can be submitted and be running at
652 // the same time.
653 // If yes, can we kill a particular attempt only?
654
655 // Response can be ignored as it is non-null on success or
656 // throws an exception in case of failures
657 super.killApplication(appId);
658 }
659
660 private static String getTestRuntimeClasspath() {
661
662 InputStream classpathFileStream = null;
663 BufferedReader reader = null;
664 String envClassPath = "";
665
666 LOG.info("Trying to generate classpath for app master from current thread's classpath");
667 try {
668
669 // Create classpath from generated classpath
670 // Check maven ppom.xml for generated classpath info
671 // Works if compile time env is same as runtime. Mainly tests.
672 ClassLoader thisClassLoader =
673 Thread.currentThread().getContextClassLoader();
674 String generatedClasspathFile = "yarn-apps-ds-generated-classpath";
675 classpathFileStream =
676 thisClassLoader.getResourceAsStream(generatedClasspathFile);
677 if (classpathFileStream == null) {
678 LOG.info("Could not classpath resource from class loader");
679 return envClassPath;
680 }
681 LOG.info("Readable bytes from stream=" + classpathFileStream.available());
682 reader = new BufferedReader(new InputStreamReader(classpathFileStream));
683 String cp = reader.readLine();
684 if (cp != null) {
685 envClassPath += cp.trim() + ":";
686 }
687 // Put the file itself on classpath for tasks.
688 envClassPath += thisClassLoader.getResource(generatedClasspathFile).getFile();
689 } catch (IOException e) {
690 LOG.info("Could not find the necessary resource to generate class path for tests. Error=" + e.getMessage());
691 }
692
693 try {
694 if (classpathFileStream != null) {
695 classpathFileStream.close();
696 }
697 if (reader != null) {
698 reader.close();
699 }
700 } catch (IOException e) {
701 LOG.info("Failed to close class path file stream or reader. Error=" + e.getMessage());
702 }
703 return envClassPath;
704 }
705
706 }