001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.applications.distributedshell;
020
021 import java.io.IOException;
022 import java.nio.ByteBuffer;
023 import java.util.ArrayList;
024 import java.util.HashMap;
025 import java.util.List;
026 import java.util.Map;
027 import java.util.Vector;
028
029 import org.apache.commons.cli.CommandLine;
030 import org.apache.commons.cli.GnuParser;
031 import org.apache.commons.cli.HelpFormatter;
032 import org.apache.commons.cli.Option;
033 import org.apache.commons.cli.Options;
034 import org.apache.commons.cli.ParseException;
035 import org.apache.commons.io.IOUtils;
036 import org.apache.commons.lang.StringUtils;
037 import org.apache.commons.logging.Log;
038 import org.apache.commons.logging.LogFactory;
039 import org.apache.hadoop.classification.InterfaceAudience;
040 import org.apache.hadoop.classification.InterfaceStability;
041 import org.apache.hadoop.conf.Configuration;
042 import org.apache.hadoop.fs.FSDataOutputStream;
043 import org.apache.hadoop.fs.FileStatus;
044 import org.apache.hadoop.fs.FileSystem;
045 import org.apache.hadoop.fs.Path;
046 import org.apache.hadoop.fs.permission.FsPermission;
047 import org.apache.hadoop.io.DataOutputBuffer;
048 import org.apache.hadoop.security.Credentials;
049 import org.apache.hadoop.security.UserGroupInformation;
050 import org.apache.hadoop.security.token.Token;
051 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
052 import org.apache.hadoop.yarn.api.ApplicationConstants;
053 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
054 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
055 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
056 import org.apache.hadoop.yarn.api.records.ApplicationId;
057 import org.apache.hadoop.yarn.api.records.ApplicationReport;
058 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
059 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
060 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
061 import org.apache.hadoop.yarn.api.records.LocalResource;
062 import org.apache.hadoop.yarn.api.records.LocalResourceType;
063 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
064 import org.apache.hadoop.yarn.api.records.NodeReport;
065 import org.apache.hadoop.yarn.api.records.NodeState;
066 import org.apache.hadoop.yarn.api.records.Priority;
067 import org.apache.hadoop.yarn.api.records.QueueACL;
068 import org.apache.hadoop.yarn.api.records.QueueInfo;
069 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
070 import org.apache.hadoop.yarn.api.records.Resource;
071 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
072 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
073 import org.apache.hadoop.yarn.client.api.YarnClient;
074 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
075 import org.apache.hadoop.yarn.conf.YarnConfiguration;
076 import org.apache.hadoop.yarn.exceptions.YarnException;
077 import org.apache.hadoop.yarn.util.ConverterUtils;
078 import org.apache.hadoop.yarn.util.Records;
079
080 /**
081 * Client for Distributed Shell application submission to YARN.
082 *
083 * <p> The distributed shell client allows an application master to be launched that in turn would run
084 * the provided shell command on a set of containers. </p>
085 *
086 * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
087 *
088 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code>
089 * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol}
090 * provides a way for the client to get access to cluster information and to request for a
091 * new {@link ApplicationId}. <p>
092 *
093 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}.
094 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId}
095 * and application name, the priority assigned to the application and the queue
096 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
097 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which
098 * the {@link ApplicationMaster} is launched. </p>
099 *
100 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the
101 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available
102 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the
103 * {@link ApplicationMaster}. <p>
104 *
105 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the
106 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code>
107 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client
108 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
109 *
110 */
111 @InterfaceAudience.Public
112 @InterfaceStability.Unstable
113 public class Client {
114
115 private static final Log LOG = LogFactory.getLog(Client.class);
116
117 // Configuration
118 private Configuration conf;
119 private YarnClient yarnClient;
120 // Application master specific info to register a new Application with RM/ASM
121 private String appName = "";
122 // App master priority
123 private int amPriority = 0;
124 // Queue for App master
125 private String amQueue = "";
126 // Amt. of memory resource to request for to run the App Master
127 private int amMemory = 10;
128 // Amt. of virtual core resource to request for to run the App Master
129 private int amVCores = 1;
130
131 // Application master jar file
132 private String appMasterJar = "";
133 // Main class to invoke application master
134 private final String appMasterMainClass;
135
136 // Shell command to be executed
137 private String shellCommand = "";
138 // Location of shell script
139 private String shellScriptPath = "";
140 // Args to be passed to the shell command
141 private String[] shellArgs = new String[] {};
142 // Env variables to be setup for the shell command
143 private Map<String, String> shellEnv = new HashMap<String, String>();
144 // Shell Command Container priority
145 private int shellCmdPriority = 0;
146
147 // Amt of memory to request for container in which shell script will be executed
148 private int containerMemory = 10;
149 // Amt. of virtual cores to request for container in which shell script will be executed
150 private int containerVirtualCores = 1;
151 // No. of containers in which the shell script needs to be executed
152 private int numContainers = 1;
153
154 // log4j.properties file
155 // if available, add to local resources and set into classpath
156 private String log4jPropFile = "";
157
158 // Start time for client
159 private final long clientStartTime = System.currentTimeMillis();
160 // Timeout threshold for client. Kill app after time interval expires.
161 private long clientTimeout = 600000;
162
163 // flag to indicate whether to keep containers across application attempts.
164 private boolean keepContainers = false;
165
166 // Debug flag
167 boolean debugFlag = false;
168
169 // Command line options
170 private Options opts;
171
172 private static final String shellCommandPath = "shellCommands";
173 private static final String shellArgsPath = "shellArgs";
174 private static final String appMasterJarPath = "AppMaster.jar";
175 // Hardcoded path to custom log_properties
176 private static final String log4jPath = "log4j.properties";
177
178 public static final String SCRIPT_PATH = "ExecScript";
179
180 /**
181 * @param args Command line arguments
182 */
183 public static void main(String[] args) {
184 boolean result = false;
185 try {
186 Client client = new Client();
187 LOG.info("Initializing Client");
188 try {
189 boolean doRun = client.init(args);
190 if (!doRun) {
191 System.exit(0);
192 }
193 } catch (IllegalArgumentException e) {
194 System.err.println(e.getLocalizedMessage());
195 client.printUsage();
196 System.exit(-1);
197 }
198 result = client.run();
199 } catch (Throwable t) {
200 LOG.fatal("Error running CLient", t);
201 System.exit(1);
202 }
203 if (result) {
204 LOG.info("Application completed successfully");
205 System.exit(0);
206 }
207 LOG.error("Application failed to complete successfully");
208 System.exit(2);
209 }
210
211 /**
212 */
213 public Client(Configuration conf) throws Exception {
214 this(
215 "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster",
216 conf);
217 }
218
219 Client(String appMasterMainClass, Configuration conf) {
220 this.conf = conf;
221 this.appMasterMainClass = appMasterMainClass;
222 yarnClient = YarnClient.createYarnClient();
223 yarnClient.init(conf);
224 opts = new Options();
225 opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
226 opts.addOption("priority", true, "Application Priority. Default 0");
227 opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
228 opts.addOption("timeout", true, "Application timeout in milliseconds");
229 opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
230 opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
231 opts.addOption("jar", true, "Jar file containing the application master");
232 opts.addOption("shell_command", true, "Shell command to be executed by " +
233 "the Application Master. Can only specify either --shell_command " +
234 "or --shell_script");
235 opts.addOption("shell_script", true, "Location of the shell script to be " +
236 "executed. Can only specify either --shell_command or --shell_script");
237 opts.addOption("shell_args", true, "Command line args for the shell script." +
238 "Multiple args can be separated by empty space.");
239 opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
240 opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
241 opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
242 opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
243 opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
244 opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
245 opts.addOption("log_properties", true, "log4j.properties file");
246 opts.addOption("keep_containers_across_application_attempts", false,
247 "Flag to indicate whether to keep containers across application attempts." +
248 " If the flag is true, running containers will not be killed when" +
249 " application attempt fails and these containers will be retrieved by" +
250 " the new application attempt ");
251 opts.addOption("debug", false, "Dump out debug information");
252 opts.addOption("help", false, "Print usage");
253
254 }
255
256 /**
257 */
258 public Client() throws Exception {
259 this(new YarnConfiguration());
260 }
261
262 /**
263 * Helper function to print out usage
264 */
265 private void printUsage() {
266 new HelpFormatter().printHelp("Client", opts);
267 }
268
269 /**
270 * Parse command line options
271 * @param args Parsed command line options
272 * @return Whether the init was successful to run the client
273 * @throws ParseException
274 */
275 public boolean init(String[] args) throws ParseException {
276
277 CommandLine cliParser = new GnuParser().parse(opts, args);
278
279 if (args.length == 0) {
280 throw new IllegalArgumentException("No args specified for client to initialize");
281 }
282
283 if (cliParser.hasOption("log_properties")) {
284 String log4jPath = cliParser.getOptionValue("log_properties");
285 try {
286 Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath);
287 } catch (Exception e) {
288 LOG.warn("Can not set up custom log4j properties. " + e);
289 }
290 }
291
292 if (cliParser.hasOption("help")) {
293 printUsage();
294 return false;
295 }
296
297 if (cliParser.hasOption("debug")) {
298 debugFlag = true;
299
300 }
301
302 if (cliParser.hasOption("keep_containers_across_application_attempts")) {
303 LOG.info("keep_containers_across_application_attempts");
304 keepContainers = true;
305 }
306
307 appName = cliParser.getOptionValue("appname", "DistributedShell");
308 amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
309 amQueue = cliParser.getOptionValue("queue", "default");
310 amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
311 amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
312
313 if (amMemory < 0) {
314 throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
315 + " Specified memory=" + amMemory);
316 }
317 if (amVCores < 0) {
318 throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
319 + " Specified virtual cores=" + amVCores);
320 }
321
322 if (!cliParser.hasOption("jar")) {
323 throw new IllegalArgumentException("No jar file specified for application master");
324 }
325
326 appMasterJar = cliParser.getOptionValue("jar");
327
328 if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) {
329 throw new IllegalArgumentException(
330 "No shell command or shell script specified to be executed by application master");
331 } else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) {
332 throw new IllegalArgumentException("Can not specify shell_command option " +
333 "and shell_script option at the same time");
334 } else if (cliParser.hasOption("shell_command")) {
335 shellCommand = cliParser.getOptionValue("shell_command");
336 } else {
337 shellScriptPath = cliParser.getOptionValue("shell_script");
338 }
339 if (cliParser.hasOption("shell_args")) {
340 shellArgs = cliParser.getOptionValues("shell_args");
341 }
342 if (cliParser.hasOption("shell_env")) {
343 String envs[] = cliParser.getOptionValues("shell_env");
344 for (String env : envs) {
345 env = env.trim();
346 int index = env.indexOf('=');
347 if (index == -1) {
348 shellEnv.put(env, "");
349 continue;
350 }
351 String key = env.substring(0, index);
352 String val = "";
353 if (index < (env.length()-1)) {
354 val = env.substring(index+1);
355 }
356 shellEnv.put(key, val);
357 }
358 }
359 shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
360
361 containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
362 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
363 numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
364
365 if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
366 throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
367 + " exiting."
368 + " Specified containerMemory=" + containerMemory
369 + ", containerVirtualCores=" + containerVirtualCores
370 + ", numContainer=" + numContainers);
371 }
372
373 clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
374
375 log4jPropFile = cliParser.getOptionValue("log_properties", "");
376
377 return true;
378 }
379
380 /**
381 * Main run function for the client
382 * @return true if application completed successfully
383 * @throws IOException
384 * @throws YarnException
385 */
386 public boolean run() throws IOException, YarnException {
387
388 LOG.info("Running Client");
389 yarnClient.start();
390
391 YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
392 LOG.info("Got Cluster metric info from ASM"
393 + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
394
395 List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
396 NodeState.RUNNING);
397 LOG.info("Got Cluster node info from ASM");
398 for (NodeReport node : clusterNodeReports) {
399 LOG.info("Got node report from ASM for"
400 + ", nodeId=" + node.getNodeId()
401 + ", nodeAddress" + node.getHttpAddress()
402 + ", nodeRackName" + node.getRackName()
403 + ", nodeNumContainers" + node.getNumContainers());
404 }
405
406 QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
407 LOG.info("Queue info"
408 + ", queueName=" + queueInfo.getQueueName()
409 + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
410 + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
411 + ", queueApplicationCount=" + queueInfo.getApplications().size()
412 + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
413
414 List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
415 for (QueueUserACLInfo aclInfo : listAclInfo) {
416 for (QueueACL userAcl : aclInfo.getUserAcls()) {
417 LOG.info("User ACL Info for Queue"
418 + ", queueName=" + aclInfo.getQueueName()
419 + ", userAcl=" + userAcl.name());
420 }
421 }
422
423 // Get a new application id
424 YarnClientApplication app = yarnClient.createApplication();
425 GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
426 // TODO get min/max resource capabilities from RM and change memory ask if needed
427 // If we do not have min/max, we may not be able to correctly request
428 // the required resources from the RM for the app master
429 // Memory ask has to be a multiple of min and less than max.
430 // Dump out information about cluster capability as seen by the resource manager
431 int maxMem = appResponse.getMaximumResourceCapability().getMemory();
432 LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
433
434 // A resource ask cannot exceed the max.
435 if (amMemory > maxMem) {
436 LOG.info("AM memory specified above max threshold of cluster. Using max value."
437 + ", specified=" + amMemory
438 + ", max=" + maxMem);
439 amMemory = maxMem;
440 }
441
442 int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
443 LOG.info("Max virtual cores capabililty of resources in this cluster " + maxVCores);
444
445 if (amVCores > maxVCores) {
446 LOG.info("AM virtual cores specified above max threshold of cluster. "
447 + "Using max value." + ", specified=" + amVCores
448 + ", max=" + maxVCores);
449 amVCores = maxVCores;
450 }
451
452 // set the application name
453 ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
454 ApplicationId appId = appContext.getApplicationId();
455
456 appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
457 appContext.setApplicationName(appName);
458
459 // Set up the container launch context for the application master
460 ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
461
462 // set local resources for the application master
463 // local files or archives as needed
464 // In this scenario, the jar file for the application master is part of the local resources
465 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
466
467 LOG.info("Copy App Master jar from local filesystem and add to local environment");
468 // Copy the application master jar to the filesystem
469 // Create a local resource to point to the destination jar path
470 FileSystem fs = FileSystem.get(conf);
471 addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
472 localResources, null);
473
474 // Set the log4j properties if needed
475 if (!log4jPropFile.isEmpty()) {
476 addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
477 localResources, null);
478 }
479
480 // The shell script has to be made available on the final container(s)
481 // where it will be executed.
482 // To do this, we need to first copy into the filesystem that is visible
483 // to the yarn framework.
484 // We do not need to set this as a local resource for the application
485 // master as the application master does not need it.
486 String hdfsShellScriptLocation = "";
487 long hdfsShellScriptLen = 0;
488 long hdfsShellScriptTimestamp = 0;
489 if (!shellScriptPath.isEmpty()) {
490 Path shellSrc = new Path(shellScriptPath);
491 String shellPathSuffix =
492 appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
493 Path shellDst =
494 new Path(fs.getHomeDirectory(), shellPathSuffix);
495 fs.copyFromLocalFile(false, true, shellSrc, shellDst);
496 hdfsShellScriptLocation = shellDst.toUri().toString();
497 FileStatus shellFileStatus = fs.getFileStatus(shellDst);
498 hdfsShellScriptLen = shellFileStatus.getLen();
499 hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
500 }
501
502 if (!shellCommand.isEmpty()) {
503 addToLocalResources(fs, null, shellCommandPath, appId.toString(),
504 localResources, shellCommand);
505 }
506
507 if (shellArgs.length > 0) {
508 addToLocalResources(fs, null, shellArgsPath, appId.toString(),
509 localResources, StringUtils.join(shellArgs, " "));
510 }
511 // Set local resource info into app master container launch context
512 amContainer.setLocalResources(localResources);
513
514 // Set the necessary security tokens as needed
515 //amContainer.setContainerTokens(containerToken);
516
517 // Set the env variables to be setup in the env where the application master will be run
518 LOG.info("Set the environment for the application master");
519 Map<String, String> env = new HashMap<String, String>();
520
521 // put location of shell script into env
522 // using the env info, the application master will create the correct local resource for the
523 // eventual containers that will be launched to execute the shell scripts
524 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
525 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
526 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
527
528 // Add AppMaster.jar location to classpath
529 // At some point we should not be required to add
530 // the hadoop specific classpaths to the env.
531 // It should be provided out of the box.
532 // For now setting all required classpaths including
533 // the classpath to "." for the application jar
534 StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
535 .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
536 for (String c : conf.getStrings(
537 YarnConfiguration.YARN_APPLICATION_CLASSPATH,
538 YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
539 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
540 classPathEnv.append(c.trim());
541 }
542 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
543 "./log4j.properties");
544
545 // add the runtime classpath needed for tests to work
546 if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
547 classPathEnv.append(':');
548 classPathEnv.append(System.getProperty("java.class.path"));
549 }
550
551 env.put("CLASSPATH", classPathEnv.toString());
552
553 amContainer.setEnvironment(env);
554
555 // Set the necessary command to execute the application master
556 Vector<CharSequence> vargs = new Vector<CharSequence>(30);
557
558 // Set java executable command
559 LOG.info("Setting up app master command");
560 vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
561 // Set Xmx based on am memory size
562 vargs.add("-Xmx" + amMemory + "m");
563 // Set class name
564 vargs.add(appMasterMainClass);
565 // Set params for Application Master
566 vargs.add("--container_memory " + String.valueOf(containerMemory));
567 vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
568 vargs.add("--num_containers " + String.valueOf(numContainers));
569 vargs.add("--priority " + String.valueOf(shellCmdPriority));
570
571 for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
572 vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
573 }
574 if (debugFlag) {
575 vargs.add("--debug");
576 }
577
578 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
579 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
580
581 // Get final commmand
582 StringBuilder command = new StringBuilder();
583 for (CharSequence str : vargs) {
584 command.append(str).append(" ");
585 }
586
587 LOG.info("Completed setting up app master command " + command.toString());
588 List<String> commands = new ArrayList<String>();
589 commands.add(command.toString());
590 amContainer.setCommands(commands);
591
592 // Set up resource type requirements
593 // For now, both memory and vcores are supported, so we set memory and
594 // vcores requirements
595 Resource capability = Records.newRecord(Resource.class);
596 capability.setMemory(amMemory);
597 capability.setVirtualCores(amVCores);
598 appContext.setResource(capability);
599
600 // Service data is a binary blob that can be passed to the application
601 // Not needed in this scenario
602 // amContainer.setServiceData(serviceData);
603
604 // Setup security tokens
605 if (UserGroupInformation.isSecurityEnabled()) {
606 Credentials credentials = new Credentials();
607 String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
608 if (tokenRenewer == null || tokenRenewer.length() == 0) {
609 throw new IOException(
610 "Can't get Master Kerberos principal for the RM to use as renewer");
611 }
612
613 // For now, only getting tokens for the default file-system.
614 final Token<?> tokens[] =
615 fs.addDelegationTokens(tokenRenewer, credentials);
616 if (tokens != null) {
617 for (Token<?> token : tokens) {
618 LOG.info("Got dt for " + fs.getUri() + "; " + token);
619 }
620 }
621 DataOutputBuffer dob = new DataOutputBuffer();
622 credentials.writeTokenStorageToStream(dob);
623 ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
624 amContainer.setTokens(fsTokens);
625 }
626
627 appContext.setAMContainerSpec(amContainer);
628
629 // Set the priority for the application master
630 Priority pri = Records.newRecord(Priority.class);
631 // TODO - what is the range for priority? how to decide?
632 pri.setPriority(amPriority);
633 appContext.setPriority(pri);
634
635 // Set the queue to which this application is to be submitted in the RM
636 appContext.setQueue(amQueue);
637
638 // Submit the application to the applications manager
639 // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
640 // Ignore the response as either a valid response object is returned on success
641 // or an exception thrown to denote some form of a failure
642 LOG.info("Submitting application to ASM");
643
644 yarnClient.submitApplication(appContext);
645
646 // TODO
647 // Try submitting the same request again
648 // app submission failure?
649
650 // Monitor the application
651 return monitorApplication(appId);
652
653 }
654
655 /**
656 * Monitor the submitted application for completion.
657 * Kill application if time expires.
658 * @param appId Application Id of application to be monitored
659 * @return true if application completed successfully
660 * @throws YarnException
661 * @throws IOException
662 */
663 private boolean monitorApplication(ApplicationId appId)
664 throws YarnException, IOException {
665
666 while (true) {
667
668 // Check app status every 1 second.
669 try {
670 Thread.sleep(1000);
671 } catch (InterruptedException e) {
672 LOG.debug("Thread sleep in monitoring loop interrupted");
673 }
674
675 // Get application report for the appId we are interested in
676 ApplicationReport report = yarnClient.getApplicationReport(appId);
677
678 LOG.info("Got application report from ASM for"
679 + ", appId=" + appId.getId()
680 + ", clientToAMToken=" + report.getClientToAMToken()
681 + ", appDiagnostics=" + report.getDiagnostics()
682 + ", appMasterHost=" + report.getHost()
683 + ", appQueue=" + report.getQueue()
684 + ", appMasterRpcPort=" + report.getRpcPort()
685 + ", appStartTime=" + report.getStartTime()
686 + ", yarnAppState=" + report.getYarnApplicationState().toString()
687 + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
688 + ", appTrackingUrl=" + report.getTrackingUrl()
689 + ", appUser=" + report.getUser());
690
691 YarnApplicationState state = report.getYarnApplicationState();
692 FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
693 if (YarnApplicationState.FINISHED == state) {
694 if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
695 LOG.info("Application has completed successfully. Breaking monitoring loop");
696 return true;
697 }
698 else {
699 LOG.info("Application did finished unsuccessfully."
700 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
701 + ". Breaking monitoring loop");
702 return false;
703 }
704 }
705 else if (YarnApplicationState.KILLED == state
706 || YarnApplicationState.FAILED == state) {
707 LOG.info("Application did not finish."
708 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
709 + ". Breaking monitoring loop");
710 return false;
711 }
712
713 if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
714 LOG.info("Reached client specified timeout for application. Killing application");
715 forceKillApplication(appId);
716 return false;
717 }
718 }
719
720 }
721
722 /**
723 * Kill a submitted application by sending a call to the ASM
724 * @param appId Application Id to be killed.
725 * @throws YarnException
726 * @throws IOException
727 */
728 private void forceKillApplication(ApplicationId appId)
729 throws YarnException, IOException {
730 // TODO clarify whether multiple jobs with the same app id can be submitted and be running at
731 // the same time.
732 // If yes, can we kill a particular attempt only?
733
734 // Response can be ignored as it is non-null on success or
735 // throws an exception in case of failures
736 yarnClient.killApplication(appId);
737 }
738
739 private void addToLocalResources(FileSystem fs, String fileSrcPath,
740 String fileDstPath, String appId, Map<String, LocalResource> localResources,
741 String resources) throws IOException {
742 String suffix =
743 appName + "/" + appId + "/" + fileDstPath;
744 Path dst =
745 new Path(fs.getHomeDirectory(), suffix);
746 if (fileSrcPath == null) {
747 FSDataOutputStream ostream = null;
748 try {
749 ostream = FileSystem
750 .create(fs, dst, new FsPermission((short) 0710));
751 ostream.writeUTF(resources);
752 } finally {
753 IOUtils.closeQuietly(ostream);
754 }
755 } else {
756 fs.copyFromLocalFile(new Path(fileSrcPath), dst);
757 }
758 FileStatus scFileStatus = fs.getFileStatus(dst);
759 LocalResource scRsrc =
760 LocalResource.newInstance(
761 ConverterUtils.getYarnUrlFromURI(dst.toUri()),
762 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
763 scFileStatus.getLen(), scFileStatus.getModificationTime());
764 localResources.put(fileDstPath, scRsrc);
765 }
766 }