001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.applications.distributedshell;
020
021import java.io.BufferedReader;
022import java.io.DataInputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.IOException;
026import java.io.StringReader;
027import java.lang.reflect.UndeclaredThrowableException;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.nio.ByteBuffer;
031import java.security.PrivilegedExceptionAction;
032import java.util.ArrayList;
033import java.util.Collections;
034import java.util.HashMap;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.Set;
039import java.util.Vector;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.atomic.AtomicInteger;
043
044import org.apache.commons.cli.CommandLine;
045import org.apache.commons.cli.GnuParser;
046import org.apache.commons.cli.HelpFormatter;
047import org.apache.commons.cli.Options;
048import org.apache.commons.cli.ParseException;
049import org.apache.commons.logging.Log;
050import org.apache.commons.logging.LogFactory;
051import org.apache.hadoop.classification.InterfaceAudience;
052import org.apache.hadoop.classification.InterfaceAudience.Private;
053import org.apache.hadoop.classification.InterfaceStability;
054import org.apache.hadoop.conf.Configuration;
055import org.apache.hadoop.fs.FileSystem;
056import org.apache.hadoop.fs.Path;
057import org.apache.hadoop.io.DataOutputBuffer;
058import org.apache.hadoop.io.IOUtils;
059import org.apache.hadoop.net.NetUtils;
060import org.apache.hadoop.security.Credentials;
061import org.apache.hadoop.security.UserGroupInformation;
062import org.apache.hadoop.security.token.Token;
063import org.apache.hadoop.util.ExitUtil;
064import org.apache.hadoop.util.Shell;
065import org.apache.hadoop.yarn.api.ApplicationConstants;
066import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
067import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
068import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
069import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
070import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
071import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
072import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
073import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
074import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
075import org.apache.hadoop.yarn.api.records.Container;
076import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
077import org.apache.hadoop.yarn.api.records.ContainerId;
078import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
079import org.apache.hadoop.yarn.api.records.ContainerState;
080import org.apache.hadoop.yarn.api.records.ContainerStatus;
081import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
082import org.apache.hadoop.yarn.api.records.LocalResource;
083import org.apache.hadoop.yarn.api.records.LocalResourceType;
084import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
085import org.apache.hadoop.yarn.api.records.NodeReport;
086import org.apache.hadoop.yarn.api.records.Priority;
087import org.apache.hadoop.yarn.api.records.Resource;
088import org.apache.hadoop.yarn.api.records.ResourceRequest;
089import org.apache.hadoop.yarn.api.records.URL;
090import org.apache.hadoop.yarn.api.records.UpdatedContainer;
091import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
092import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
093import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
094import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
095import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
096import org.apache.hadoop.yarn.client.api.TimelineClient;
097import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
098import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
099import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
100import org.apache.hadoop.yarn.conf.YarnConfiguration;
101import org.apache.hadoop.yarn.exceptions.YarnException;
102import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
103import org.apache.hadoop.yarn.util.ConverterUtils;
104import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
105import org.apache.log4j.LogManager;
106
107import com.google.common.annotations.VisibleForTesting;
108import com.sun.jersey.api.client.ClientHandlerException;
109
110/**
111 * An ApplicationMaster for executing shell commands on a set of launched
112 * containers using the YARN framework.
113 * 
114 * <p>
115 * This class is meant to act as an example on how to write yarn-based
116 * application masters.
117 * </p>
118 * 
119 * <p>
120 * The ApplicationMaster is started on a container by the
121 * <code>ResourceManager</code>'s launcher. The first thing that the
122 * <code>ApplicationMaster</code> needs to do is to connect and register itself
123 * with the <code>ResourceManager</code>. The registration sets up information
124 * within the <code>ResourceManager</code> regarding what host:port the
125 * ApplicationMaster is listening on to provide any form of functionality to a
126 * client as well as a tracking url that a client can use to keep track of
127 * status/job history if needed. However, in the distributedshell, trackingurl
128 * and appMasterHost:appMasterRpcPort are not supported.
129 * </p>
130 * 
131 * <p>
132 * The <code>ApplicationMaster</code> needs to send a heartbeat to the
133 * <code>ResourceManager</code> at regular intervals to inform the
134 * <code>ResourceManager</code> that it is up and alive. The
135 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
136 * <code>ApplicationMaster</code> acts as a heartbeat.
137 * 
138 * <p>
139 * For the actual handling of the job, the <code>ApplicationMaster</code> has to
140 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
141 * required no. of containers using {@link ResourceRequest} with the necessary
142 * resource specifications such as node location, computational
143 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
144 * responds with an {@link AllocateResponse} that informs the
145 * <code>ApplicationMaster</code> of the set of newly allocated containers,
146 * completed containers as well as current state of available resources.
147 * </p>
148 * 
149 * <p>
150 * For each allocated container, the <code>ApplicationMaster</code> can then set
151 * up the necessary launch context via {@link ContainerLaunchContext} to specify
152 * the allocated container id, local resources required by the executable, the
153 * environment to be setup for the executable, commands to execute, etc. and
154 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
155 * launch and execute the defined commands on the given allocated container.
156 * </p>
157 * 
158 * <p>
159 * The <code>ApplicationMaster</code> can monitor the launched container by
160 * either querying the <code>ResourceManager</code> using
161 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
162 * the {@link ContainerManagementProtocol} by querying for the status of the allocated
163 * container's {@link ContainerId}.
164 *
165 * <p>
166 * After the job has been completed, the <code>ApplicationMaster</code> has to
167 * send a {@link FinishApplicationMasterRequest} to the
168 * <code>ResourceManager</code> to inform it that the
169 * <code>ApplicationMaster</code> has been completed.
170 */
171@InterfaceAudience.Public
172@InterfaceStability.Unstable
173public class ApplicationMaster {
174
175  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
176
177  @VisibleForTesting
178  @Private
179  public static enum DSEvent {
180    DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
181  }
182  
183  @VisibleForTesting
184  @Private
185  public static enum DSEntity {
186    DS_APP_ATTEMPT, DS_CONTAINER
187  }
188
189  private static final String YARN_SHELL_ID = "YARN_SHELL_ID";
190
191  // Configuration
192  private Configuration conf;
193
194  // Handle to communicate with the Resource Manager
195  @SuppressWarnings("rawtypes")
196  private AMRMClientAsync amRMClient;
197
198  // In both secure and non-secure modes, this points to the job-submitter.
199  @VisibleForTesting
200  UserGroupInformation appSubmitterUgi;
201
202  // Handle to communicate with the Node Manager
203  private NMClientAsync nmClientAsync;
204  // Listen to process the response from the Node Manager
205  private NMCallbackHandler containerListener;
206
207  // Application Attempt Id ( combination of attemptId and fail count )
208  @VisibleForTesting
209  protected ApplicationAttemptId appAttemptID;
210
211  // TODO
212  // For status update for clients - yet to be implemented
213  // Hostname of the container
214  private String appMasterHostname = "";
215  // Port on which the app master listens for status updates from clients
216  private int appMasterRpcPort = -1;
217  // Tracking url to which app master publishes info for clients to monitor
218  private String appMasterTrackingUrl = "";
219
220  // App Master configuration
221  // No. of containers to run shell command on
222  @VisibleForTesting
223  protected int numTotalContainers = 1;
224  // Memory to request for the container on which the shell command will run
225  private long containerMemory = 10;
226  // VirtualCores to request for the container on which the shell command will run
227  private int containerVirtualCores = 1;
228  // Priority of the request
229  private int requestPriority;
230
231  // Counter for completed containers ( complete denotes successful or failed )
232  private AtomicInteger numCompletedContainers = new AtomicInteger();
233  // Allocated container count so that we know how many containers has the RM
234  // allocated to us
235  @VisibleForTesting
236  protected AtomicInteger numAllocatedContainers = new AtomicInteger();
237  // Count of failed containers
238  private AtomicInteger numFailedContainers = new AtomicInteger();
239  // Count of containers already requested from the RM
240  // Needed as once requested, we should not request for containers again.
241  // Only request for more if the original requirement changes.
242  @VisibleForTesting
243  protected AtomicInteger numRequestedContainers = new AtomicInteger();
244
245  // Shell command to be executed
246  private String shellCommand = "";
247  // Args to be passed to the shell command
248  private String shellArgs = "";
249  // Env variables to be setup for the shell command
250  private Map<String, String> shellEnv = new HashMap<String, String>();
251
252  // Location of shell script ( obtained from info set in env )
253  // Shell script path in fs
254  private String scriptPath = "";
255  // Timestamp needed for creating a local resource
256  private long shellScriptPathTimestamp = 0;
257  // File length needed for local resource
258  private long shellScriptPathLen = 0;
259
260  // Timeline domain ID
261  private String domainId = null;
262
263  // Hardcoded path to shell script in launch container's local env
264  private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
265  private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
266      + ".bat";
267
268  // Hardcoded path to custom log_properties
269  private static final String log4jPath = "log4j.properties";
270
271  private static final String shellCommandPath = "shellCommands";
272  private static final String shellArgsPath = "shellArgs";
273
274  private volatile boolean done;
275
276  private ByteBuffer allTokens;
277
278  // Launch threads
279  private List<Thread> launchThreads = new ArrayList<Thread>();
280
281  // Timeline Client
282  @VisibleForTesting
283  TimelineClient timelineClient;
284  static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
285  static final String APPID_TIMELINE_FILTER_NAME = "appId";
286  static final String USER_TIMELINE_FILTER_NAME = "user";
287
288  private final String linux_bash_command = "bash";
289  private final String windows_command = "cmd /c";
290
291  private int yarnShellIdCounter = 1;
292
293  @VisibleForTesting
294  protected final Set<ContainerId> launchedContainers =
295      Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
296
297  /**
298   * @param args Command line args
299   */
300  public static void main(String[] args) {
301    boolean result = false;
302    try {
303      ApplicationMaster appMaster = new ApplicationMaster();
304      LOG.info("Initializing ApplicationMaster");
305      boolean doRun = appMaster.init(args);
306      if (!doRun) {
307        System.exit(0);
308      }
309      appMaster.run();
310      result = appMaster.finish();
311    } catch (Throwable t) {
312      LOG.fatal("Error running ApplicationMaster", t);
313      LogManager.shutdown();
314      ExitUtil.terminate(1, t);
315    }
316    if (result) {
317      LOG.info("Application Master completed successfully. exiting");
318      System.exit(0);
319    } else {
320      LOG.info("Application Master failed. exiting");
321      System.exit(2);
322    }
323  }
324
325  /**
326   * Dump out contents of $CWD and the environment to stdout for debugging
327   */
328  private void dumpOutDebugInfo() {
329
330    LOG.info("Dump debug output");
331    Map<String, String> envs = System.getenv();
332    for (Map.Entry<String, String> env : envs.entrySet()) {
333      LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
334      System.out.println("System env: key=" + env.getKey() + ", val="
335          + env.getValue());
336    }
337
338    BufferedReader buf = null;
339    try {
340      String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
341        Shell.execCommand("ls", "-al");
342      buf = new BufferedReader(new StringReader(lines));
343      String line = "";
344      while ((line = buf.readLine()) != null) {
345        LOG.info("System CWD content: " + line);
346        System.out.println("System CWD content: " + line);
347      }
348    } catch (IOException e) {
349      e.printStackTrace();
350    } finally {
351      IOUtils.cleanup(LOG, buf);
352    }
353  }
354
355  public ApplicationMaster() {
356    // Set up the configuration
357    conf = new YarnConfiguration();
358  }
359
360  /**
361   * Parse command line options
362   *
363   * @param args Command line args
364   * @return Whether init successful and run should be invoked
365   * @throws ParseException
366   * @throws IOException
367   */
368  public boolean init(String[] args) throws ParseException, IOException {
369    Options opts = new Options();
370    opts.addOption("app_attempt_id", true,
371        "App Attempt ID. Not to be used unless for testing purposes");
372    opts.addOption("shell_env", true,
373        "Environment for shell script. Specified as env_key=env_val pairs");
374    opts.addOption("container_memory", true,
375        "Amount of memory in MB to be requested to run the shell command");
376    opts.addOption("container_vcores", true,
377        "Amount of virtual cores to be requested to run the shell command");
378    opts.addOption("num_containers", true,
379        "No. of containers on which the shell command needs to be executed");
380    opts.addOption("priority", true, "Application Priority. Default 0");
381    opts.addOption("debug", false, "Dump out debug information");
382
383    opts.addOption("help", false, "Print usage");
384    CommandLine cliParser = new GnuParser().parse(opts, args);
385
386    if (args.length == 0) {
387      printUsage(opts);
388      throw new IllegalArgumentException(
389          "No args specified for application master to initialize");
390    }
391
392    //Check whether customer log4j.properties file exists
393    if (fileExist(log4jPath)) {
394      try {
395        Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
396            log4jPath);
397      } catch (Exception e) {
398        LOG.warn("Can not set up custom log4j properties. " + e);
399      }
400    }
401
402    if (cliParser.hasOption("help")) {
403      printUsage(opts);
404      return false;
405    }
406
407    if (cliParser.hasOption("debug")) {
408      dumpOutDebugInfo();
409    }
410
411    Map<String, String> envs = System.getenv();
412
413    if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
414      if (cliParser.hasOption("app_attempt_id")) {
415        String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
416        appAttemptID = ApplicationAttemptId.fromString(appIdStr);
417      } else {
418        throw new IllegalArgumentException(
419            "Application Attempt Id not set in the environment");
420      }
421    } else {
422      ContainerId containerId = ContainerId.fromString(envs
423          .get(Environment.CONTAINER_ID.name()));
424      appAttemptID = containerId.getApplicationAttemptId();
425    }
426
427    if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
428      throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
429          + " not set in the environment");
430    }
431    if (!envs.containsKey(Environment.NM_HOST.name())) {
432      throw new RuntimeException(Environment.NM_HOST.name()
433          + " not set in the environment");
434    }
435    if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
436      throw new RuntimeException(Environment.NM_HTTP_PORT
437          + " not set in the environment");
438    }
439    if (!envs.containsKey(Environment.NM_PORT.name())) {
440      throw new RuntimeException(Environment.NM_PORT.name()
441          + " not set in the environment");
442    }
443
444    LOG.info("Application master for app" + ", appId="
445        + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
446        + appAttemptID.getApplicationId().getClusterTimestamp()
447        + ", attemptId=" + appAttemptID.getAttemptId());
448
449    if (!fileExist(shellCommandPath)
450        && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
451      throw new IllegalArgumentException(
452          "No shell command or shell script specified to be executed by application master");
453    }
454
455    if (fileExist(shellCommandPath)) {
456      shellCommand = readContent(shellCommandPath);
457    }
458
459    if (fileExist(shellArgsPath)) {
460      shellArgs = readContent(shellArgsPath);
461    }
462
463    if (cliParser.hasOption("shell_env")) {
464      String shellEnvs[] = cliParser.getOptionValues("shell_env");
465      for (String env : shellEnvs) {
466        env = env.trim();
467        int index = env.indexOf('=');
468        if (index == -1) {
469          shellEnv.put(env, "");
470          continue;
471        }
472        String key = env.substring(0, index);
473        String val = "";
474        if (index < (env.length() - 1)) {
475          val = env.substring(index + 1);
476        }
477        shellEnv.put(key, val);
478      }
479    }
480
481    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
482      scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
483
484      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
485        shellScriptPathTimestamp = Long.parseLong(envs
486            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
487      }
488      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
489        shellScriptPathLen = Long.parseLong(envs
490            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
491      }
492      if (!scriptPath.isEmpty()
493          && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
494        LOG.error("Illegal values in env for shell script path" + ", path="
495            + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
496            + shellScriptPathTimestamp);
497        throw new IllegalArgumentException(
498            "Illegal values in env for shell script path");
499      }
500    }
501
502    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
503      domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
504    }
505
506    containerMemory = Integer.parseInt(cliParser.getOptionValue(
507        "container_memory", "10"));
508    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
509        "container_vcores", "1"));
510    numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
511        "num_containers", "1"));
512    if (numTotalContainers == 0) {
513      throw new IllegalArgumentException(
514          "Cannot run distributed shell with no containers");
515    }
516    requestPriority = Integer.parseInt(cliParser
517        .getOptionValue("priority", "0"));
518    return true;
519  }
520
521  /**
522   * Helper function to print usage
523   *
524   * @param opts Parsed command line options
525   */
526  private void printUsage(Options opts) {
527    new HelpFormatter().printHelp("ApplicationMaster", opts);
528  }
529
530  /**
531   * Main run function for the application master
532   *
533   * @throws YarnException
534   * @throws IOException
535   */
536  @SuppressWarnings({ "unchecked" })
537  public void run() throws YarnException, IOException, InterruptedException {
538    LOG.info("Starting ApplicationMaster");
539
540    // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
541    // are marked as LimitedPrivate
542    Credentials credentials =
543        UserGroupInformation.getCurrentUser().getCredentials();
544    DataOutputBuffer dob = new DataOutputBuffer();
545    credentials.writeTokenStorageToStream(dob);
546    // Now remove the AM->RM token so that containers cannot access it.
547    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
548    LOG.info("Executing with tokens:");
549    while (iter.hasNext()) {
550      Token<?> token = iter.next();
551      LOG.info(token);
552      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
553        iter.remove();
554      }
555    }
556    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
557
558    // Create appSubmitterUgi and add original tokens to it
559    String appSubmitterUserName =
560        System.getenv(ApplicationConstants.Environment.USER.name());
561    appSubmitterUgi =
562        UserGroupInformation.createRemoteUser(appSubmitterUserName);
563    appSubmitterUgi.addCredentials(credentials);
564
565
566    AMRMClientAsync.AbstractCallbackHandler allocListener =
567        new RMCallbackHandler();
568    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
569    amRMClient.init(conf);
570    amRMClient.start();
571
572    containerListener = createNMCallbackHandler();
573    nmClientAsync = new NMClientAsyncImpl(containerListener);
574    nmClientAsync.init(conf);
575    nmClientAsync.start();
576
577    startTimelineClient(conf);
578    if(timelineClient != null) {
579      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
580          DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
581    }
582
583    // Setup local RPC Server to accept status requests directly from clients
584    // TODO need to setup a protocol for client to be able to communicate to
585    // the RPC server
586    // TODO use the rpc port info to register with the RM for the client to
587    // send requests to this app master
588
589    // Register self with ResourceManager
590    // This will start heartbeating to the RM
591    appMasterHostname = NetUtils.getHostname();
592    RegisterApplicationMasterResponse response = amRMClient
593        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
594            appMasterTrackingUrl);
595    // Dump out information about cluster capability as seen by the
596    // resource manager
597    long maxMem = response.getMaximumResourceCapability().getMemorySize();
598    LOG.info("Max mem capability of resources in this cluster " + maxMem);
599    
600    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
601    LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
602
603    // A resource ask cannot exceed the max.
604    if (containerMemory > maxMem) {
605      LOG.info("Container memory specified above max threshold of cluster."
606          + " Using max value." + ", specified=" + containerMemory + ", max="
607          + maxMem);
608      containerMemory = maxMem;
609    }
610
611    if (containerVirtualCores > maxVCores) {
612      LOG.info("Container virtual cores specified above max threshold of cluster."
613          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
614          + maxVCores);
615      containerVirtualCores = maxVCores;
616    }
617
618    List<Container> previousAMRunningContainers =
619        response.getContainersFromPreviousAttempts();
620    LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
621      + " previous attempts' running containers on AM registration.");
622    for(Container container: previousAMRunningContainers) {
623      launchedContainers.add(container.getId());
624    }
625    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
626
627
628    int numTotalContainersToRequest =
629        numTotalContainers - previousAMRunningContainers.size();
630    // Setup ask for containers from RM
631    // Send request for containers to RM
632    // Until we get our fully allocated quota, we keep on polling RM for
633    // containers
634    // Keep looping until all the containers are launched and shell script
635    // executed on them ( regardless of success/failure).
636    for (int i = 0; i < numTotalContainersToRequest; ++i) {
637      ContainerRequest containerAsk = setupContainerAskForRM();
638      amRMClient.addContainerRequest(containerAsk);
639    }
640    numRequestedContainers.set(numTotalContainers);
641  }
642
643  @VisibleForTesting
644  void startTimelineClient(final Configuration conf)
645      throws YarnException, IOException, InterruptedException {
646    try {
647      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
648        @Override
649        public Void run() throws Exception {
650          if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
651              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
652            // Creating the Timeline Client
653            timelineClient = TimelineClient.createTimelineClient();
654            timelineClient.init(conf);
655            timelineClient.start();
656          } else {
657            timelineClient = null;
658            LOG.warn("Timeline service is not enabled");
659          }
660          return null;
661        }
662      });
663    } catch (UndeclaredThrowableException e) {
664      throw new YarnException(e.getCause());
665    }
666  }
667
668  @VisibleForTesting
669  NMCallbackHandler createNMCallbackHandler() {
670    return new NMCallbackHandler(this);
671  }
672
673  @VisibleForTesting
674  protected boolean finish() {
675    // wait for completion.
676    while (!done
677        && (numCompletedContainers.get() != numTotalContainers)) {
678      try {
679        Thread.sleep(200);
680      } catch (InterruptedException ex) {}
681    }
682
683    if(timelineClient != null) {
684      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
685          DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
686    }
687
688    // Join all launched threads
689    // needed for when we time out
690    // and we need to release containers
691    for (Thread launchThread : launchThreads) {
692      try {
693        launchThread.join(10000);
694      } catch (InterruptedException e) {
695        LOG.info("Exception thrown in thread join: " + e.getMessage());
696        e.printStackTrace();
697      }
698    }
699
700    // When the application completes, it should stop all running containers
701    LOG.info("Application completed. Stopping running containers");
702    nmClientAsync.stop();
703
704    // When the application completes, it should send a finish application
705    // signal to the RM
706    LOG.info("Application completed. Signalling finish to RM");
707
708    FinalApplicationStatus appStatus;
709    String appMessage = null;
710    boolean success = true;
711    if (numFailedContainers.get() == 0 && 
712        numCompletedContainers.get() == numTotalContainers) {
713      appStatus = FinalApplicationStatus.SUCCEEDED;
714    } else {
715      appStatus = FinalApplicationStatus.FAILED;
716      appMessage = "Diagnostics." + ", total=" + numTotalContainers
717          + ", completed=" + numCompletedContainers.get() + ", allocated="
718          + numAllocatedContainers.get() + ", failed="
719          + numFailedContainers.get();
720      LOG.info(appMessage);
721      success = false;
722    }
723    try {
724      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
725    } catch (YarnException ex) {
726      LOG.error("Failed to unregister application", ex);
727    } catch (IOException e) {
728      LOG.error("Failed to unregister application", e);
729    }
730    
731    amRMClient.stop();
732
733    // Stop Timeline Client
734    if(timelineClient != null) {
735      timelineClient.stop();
736    }
737
738    return success;
739  }
740
741  @VisibleForTesting
742  class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
743    @SuppressWarnings("unchecked")
744    @Override
745    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
746      LOG.info("Got response from RM for container ask, completedCnt="
747          + completedContainers.size());
748      for (ContainerStatus containerStatus : completedContainers) {
749        LOG.info(appAttemptID + " got container status for containerID="
750            + containerStatus.getContainerId() + ", state="
751            + containerStatus.getState() + ", exitStatus="
752            + containerStatus.getExitStatus() + ", diagnostics="
753            + containerStatus.getDiagnostics());
754
755        // non complete containers should not be here
756        assert (containerStatus.getState() == ContainerState.COMPLETE);
757        // ignore containers we know nothing about - probably from a previous
758        // attempt
759        if (!launchedContainers.contains(containerStatus.getContainerId())) {
760          LOG.info("Ignoring completed status of "
761              + containerStatus.getContainerId()
762              + "; unknown container(probably launched by previous attempt)");
763          continue;
764        }
765
766        // increment counters for completed/failed containers
767        int exitStatus = containerStatus.getExitStatus();
768        if (0 != exitStatus) {
769          // container failed
770          if (ContainerExitStatus.ABORTED != exitStatus) {
771            // shell script failed
772            // counts as completed
773            numCompletedContainers.incrementAndGet();
774            numFailedContainers.incrementAndGet();
775          } else {
776            // container was killed by framework, possibly preempted
777            // we should re-try as the container was lost for some reason
778            numAllocatedContainers.decrementAndGet();
779            numRequestedContainers.decrementAndGet();
780            // we do not need to release the container as it would be done
781            // by the RM
782          }
783        } else {
784          // nothing to do
785          // container completed successfully
786          numCompletedContainers.incrementAndGet();
787          LOG.info("Container completed successfully." + ", containerId="
788              + containerStatus.getContainerId());
789        }
790        if(timelineClient != null) {
791          publishContainerEndEvent(
792              timelineClient, containerStatus, domainId, appSubmitterUgi);
793        }
794      }
795      
796      // ask for more containers if any failed
797      int askCount = numTotalContainers - numRequestedContainers.get();
798      numRequestedContainers.addAndGet(askCount);
799
800      if (askCount > 0) {
801        for (int i = 0; i < askCount; ++i) {
802          ContainerRequest containerAsk = setupContainerAskForRM();
803          amRMClient.addContainerRequest(containerAsk);
804        }
805      }
806      
807      if (numCompletedContainers.get() == numTotalContainers) {
808        done = true;
809      }
810    }
811
812    @Override
813    public void onContainersAllocated(List<Container> allocatedContainers) {
814      LOG.info("Got response from RM for container ask, allocatedCnt="
815          + allocatedContainers.size());
816      numAllocatedContainers.addAndGet(allocatedContainers.size());
817      for (Container allocatedContainer : allocatedContainers) {
818        String yarnShellId = Integer.toString(yarnShellIdCounter);
819        yarnShellIdCounter++;
820        LOG.info("Launching shell command on a new container."
821            + ", containerId=" + allocatedContainer.getId()
822            + ", yarnShellId=" + yarnShellId
823            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
824            + ":" + allocatedContainer.getNodeId().getPort()
825            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
826            + ", containerResourceMemory"
827            + allocatedContainer.getResource().getMemorySize()
828            + ", containerResourceVirtualCores"
829            + allocatedContainer.getResource().getVirtualCores());
830        // + ", containerToken"
831        // +allocatedContainer.getContainerToken().getIdentifier().toString());
832
833        Thread launchThread = createLaunchContainerThread(allocatedContainer,
834            yarnShellId);
835
836        // launch and start the container on a separate thread to keep
837        // the main thread unblocked
838        // as all containers may not be allocated at one go.
839        launchThreads.add(launchThread);
840        launchedContainers.add(allocatedContainer.getId());
841        launchThread.start();
842      }
843    }
844
845    @Override
846    public void onContainersUpdated(
847        List<UpdatedContainer> containers) {}
848
849    @Override
850    public void onShutdownRequest() {
851      done = true;
852    }
853
854    @Override
855    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
856
857    @Override
858    public float getProgress() {
859      // set progress to deliver to RM on next heartbeat
860      float progress = (float) numCompletedContainers.get()
861          / numTotalContainers;
862      return progress;
863    }
864
865    @Override
866    public void onError(Throwable e) {
867      LOG.error("Error in RMCallbackHandler: ", e);
868      done = true;
869      amRMClient.stop();
870    }
871  }
872
873  @VisibleForTesting
874  static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
875
876    private ConcurrentMap<ContainerId, Container> containers =
877        new ConcurrentHashMap<ContainerId, Container>();
878    private final ApplicationMaster applicationMaster;
879
880    public NMCallbackHandler(ApplicationMaster applicationMaster) {
881      this.applicationMaster = applicationMaster;
882    }
883
884    public void addContainer(ContainerId containerId, Container container) {
885      containers.putIfAbsent(containerId, container);
886    }
887
888    @Override
889    public void onContainerStopped(ContainerId containerId) {
890      if (LOG.isDebugEnabled()) {
891        LOG.debug("Succeeded to stop Container " + containerId);
892      }
893      containers.remove(containerId);
894    }
895
896    @Override
897    public void onContainerStatusReceived(ContainerId containerId,
898        ContainerStatus containerStatus) {
899      if (LOG.isDebugEnabled()) {
900        LOG.debug("Container Status: id=" + containerId + ", status=" +
901            containerStatus);
902      }
903    }
904
905    @Override
906    public void onContainerStarted(ContainerId containerId,
907        Map<String, ByteBuffer> allServiceResponse) {
908      if (LOG.isDebugEnabled()) {
909        LOG.debug("Succeeded to start Container " + containerId);
910      }
911      Container container = containers.get(containerId);
912      if (container != null) {
913        applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
914      }
915      if(applicationMaster.timelineClient != null) {
916        applicationMaster.publishContainerStartEvent(
917            applicationMaster.timelineClient, container,
918            applicationMaster.domainId, applicationMaster.appSubmitterUgi);
919      }
920    }
921
922    @Override
923    public void onContainerResourceIncreased(
924        ContainerId containerId, Resource resource) {}
925
926    @Override
927    public void onStartContainerError(ContainerId containerId, Throwable t) {
928      LOG.error("Failed to start Container " + containerId);
929      containers.remove(containerId);
930      applicationMaster.numCompletedContainers.incrementAndGet();
931      applicationMaster.numFailedContainers.incrementAndGet();
932    }
933
934    @Override
935    public void onGetContainerStatusError(
936        ContainerId containerId, Throwable t) {
937      LOG.error("Failed to query the status of Container " + containerId);
938    }
939
940    @Override
941    public void onStopContainerError(ContainerId containerId, Throwable t) {
942      LOG.error("Failed to stop Container " + containerId);
943      containers.remove(containerId);
944    }
945
946    @Override
947    public void onIncreaseContainerResourceError(
948        ContainerId containerId, Throwable t) {}
949
950  }
951
952  /**
953   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
954   * that will execute the shell command.
955   */
956  private class LaunchContainerRunnable implements Runnable {
957
958    // Allocated container
959    private Container container;
960    private String shellId;
961
962    NMCallbackHandler containerListener;
963
964    /**
965     * @param lcontainer Allocated container
966     * @param containerListener Callback handler of the container
967     */
968    public LaunchContainerRunnable(Container lcontainer,
969        NMCallbackHandler containerListener, String shellId) {
970      this.container = lcontainer;
971      this.containerListener = containerListener;
972      this.shellId = shellId;
973    }
974
975    @Override
976    /**
977     * Connects to CM, sets up container launch context 
978     * for shell command and eventually dispatches the container 
979     * start request to the CM. 
980     */
981    public void run() {
982      LOG.info("Setting up container launch container for containerid="
983          + container.getId() + " with shellid=" + shellId);
984
985      // Set the local resources
986      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
987
988      // The container for the eventual shell commands needs its own local
989      // resources too.
990      // In this scenario, if a shell script is specified, we need to have it
991      // copied and made available to the container.
992      if (!scriptPath.isEmpty()) {
993        Path renamedScriptPath = null;
994        if (Shell.WINDOWS) {
995          renamedScriptPath = new Path(scriptPath + ".bat");
996        } else {
997          renamedScriptPath = new Path(scriptPath + ".sh");
998        }
999
1000        try {
1001          // rename the script file based on the underlying OS syntax.
1002          renameScriptFile(renamedScriptPath);
1003        } catch (Exception e) {
1004          LOG.error(
1005              "Not able to add suffix (.bat/.sh) to the shell script filename",
1006              e);
1007          // We know we cannot continue launching the container
1008          // so we should release it.
1009          numCompletedContainers.incrementAndGet();
1010          numFailedContainers.incrementAndGet();
1011          return;
1012        }
1013
1014        URL yarnUrl = null;
1015        try {
1016          yarnUrl = URL.fromURI(new URI(renamedScriptPath.toString()));
1017        } catch (URISyntaxException e) {
1018          LOG.error("Error when trying to use shell script path specified"
1019              + " in env, path=" + renamedScriptPath, e);
1020          // A failure scenario on bad input such as invalid shell script path
1021          // We know we cannot continue launching the container
1022          // so we should release it.
1023          // TODO
1024          numCompletedContainers.incrementAndGet();
1025          numFailedContainers.incrementAndGet();
1026          return;
1027        }
1028        LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
1029          LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
1030          shellScriptPathLen, shellScriptPathTimestamp);
1031        localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
1032            ExecShellStringPath, shellRsrc);
1033        shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
1034      }
1035
1036      // Set the necessary command to execute on the allocated container
1037      Vector<CharSequence> vargs = new Vector<CharSequence>(5);
1038
1039      // Set executable command
1040      vargs.add(shellCommand);
1041      // Set shell script path
1042      if (!scriptPath.isEmpty()) {
1043        vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
1044            : ExecShellStringPath);
1045      }
1046
1047      // Set args for the shell command if any
1048      vargs.add(shellArgs);
1049      // Add log redirect params
1050      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
1051      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
1052
1053      // Get final commmand
1054      StringBuilder command = new StringBuilder();
1055      for (CharSequence str : vargs) {
1056        command.append(str).append(" ");
1057      }
1058
1059      List<String> commands = new ArrayList<String>();
1060      commands.add(command.toString());
1061
1062      // Set up ContainerLaunchContext, setting local resource, environment,
1063      // command and token for constructor.
1064
1065      // Note for tokens: Set up tokens for the container too. Today, for normal
1066      // shell commands, the container in distribute-shell doesn't need any
1067      // tokens. We are populating them mainly for NodeManagers to be able to
1068      // download anyfiles in the distributed file-system. The tokens are
1069      // otherwise also useful in cases, for e.g., when one is running a
1070      // "hadoop dfs" command inside the distributed shell.
1071      Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv);
1072      myShellEnv.put(YARN_SHELL_ID, shellId);
1073      ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
1074        localResources, myShellEnv, commands, null, allTokens.duplicate(),
1075          null);
1076      containerListener.addContainer(container.getId(), container);
1077      nmClientAsync.startContainerAsync(container, ctx);
1078    }
1079  }
1080
1081  private void renameScriptFile(final Path renamedScriptPath)
1082      throws IOException, InterruptedException {
1083    appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
1084      @Override
1085      public Void run() throws IOException {
1086        FileSystem fs = renamedScriptPath.getFileSystem(conf);
1087        fs.rename(new Path(scriptPath), renamedScriptPath);
1088        return null;
1089      }
1090    });
1091    LOG.info("User " + appSubmitterUgi.getUserName()
1092        + " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
1093  }
1094
1095  /**
1096   * Setup the request that will be sent to the RM for the container ask.
1097   *
1098   * @return the setup ResourceRequest to be sent to RM
1099   */
1100  private ContainerRequest setupContainerAskForRM() {
1101    // setup requirements for hosts
1102    // using * as any host will do for the distributed shell app
1103    // set the priority for the request
1104    // TODO - what is the range for priority? how to decide?
1105    Priority pri = Priority.newInstance(requestPriority);
1106
1107    // Set up resource type requirements
1108    // For now, memory and CPU are supported so we set memory and cpu requirements
1109    Resource capability = Resource.newInstance(containerMemory,
1110      containerVirtualCores);
1111
1112    ContainerRequest request = new ContainerRequest(capability, null, null,
1113        pri);
1114    LOG.info("Requested container ask: " + request.toString());
1115    return request;
1116  }
1117
1118  private boolean fileExist(String filePath) {
1119    return new File(filePath).exists();
1120  }
1121
1122  private String readContent(String filePath) throws IOException {
1123    DataInputStream ds = null;
1124    try {
1125      ds = new DataInputStream(new FileInputStream(filePath));
1126      return ds.readUTF();
1127    } finally {
1128      org.apache.commons.io.IOUtils.closeQuietly(ds);
1129    }
1130  }
1131
1132  private void publishContainerStartEvent(
1133      final TimelineClient timelineClient, final Container container,
1134      String domainId, UserGroupInformation ugi) {
1135    final TimelineEntity entity = new TimelineEntity();
1136    entity.setEntityId(container.getId().toString());
1137    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1138    entity.setDomainId(domainId);
1139    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
1140    entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getId()
1141        .getApplicationAttemptId().getApplicationId().toString());
1142    TimelineEvent event = new TimelineEvent();
1143    event.setTimestamp(System.currentTimeMillis());
1144    event.setEventType(DSEvent.DS_CONTAINER_START.toString());
1145    event.addEventInfo("Node", container.getNodeId().toString());
1146    event.addEventInfo("Resources", container.getResource().toString());
1147    entity.addEvent(event);
1148
1149    try {
1150      processTimelineResponseErrors(
1151          putContainerEntity(timelineClient,
1152              container.getId().getApplicationAttemptId(),
1153              entity));
1154    } catch (YarnException | IOException | ClientHandlerException e) {
1155      LOG.error("Container start event could not be published for "
1156          + container.getId().toString(), e);
1157    }
1158  }
1159
1160  @VisibleForTesting
1161  void publishContainerEndEvent(
1162      final TimelineClient timelineClient, ContainerStatus container,
1163      String domainId, UserGroupInformation ugi) {
1164    final TimelineEntity entity = new TimelineEntity();
1165    entity.setEntityId(container.getContainerId().toString());
1166    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1167    entity.setDomainId(domainId);
1168    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
1169    entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME,
1170        container.getContainerId().getApplicationAttemptId()
1171            .getApplicationId().toString());
1172    TimelineEvent event = new TimelineEvent();
1173    event.setTimestamp(System.currentTimeMillis());
1174    event.setEventType(DSEvent.DS_CONTAINER_END.toString());
1175    event.addEventInfo("State", container.getState().name());
1176    event.addEventInfo("Exit Status", container.getExitStatus());
1177    entity.addEvent(event);
1178    try {
1179      processTimelineResponseErrors(
1180          putContainerEntity(timelineClient,
1181              container.getContainerId().getApplicationAttemptId(),
1182              entity));
1183    } catch (YarnException | IOException | ClientHandlerException e) {
1184      LOG.error("Container end event could not be published for "
1185          + container.getContainerId().toString(), e);
1186    }
1187  }
1188
1189  private TimelinePutResponse putContainerEntity(
1190      TimelineClient timelineClient, ApplicationAttemptId currAttemptId,
1191      TimelineEntity entity)
1192      throws YarnException, IOException {
1193    if (TimelineUtils.timelineServiceV1_5Enabled(conf)) {
1194      TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance(
1195          currAttemptId.getApplicationId(),
1196          CONTAINER_ENTITY_GROUP_ID);
1197      return timelineClient.putEntities(currAttemptId, groupId, entity);
1198    } else {
1199      return timelineClient.putEntities(entity);
1200    }
1201  }
1202
1203  private void publishApplicationAttemptEvent(
1204      final TimelineClient timelineClient, String appAttemptId,
1205      DSEvent appEvent, String domainId, UserGroupInformation ugi) {
1206    final TimelineEntity entity = new TimelineEntity();
1207    entity.setEntityId(appAttemptId);
1208    entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
1209    entity.setDomainId(domainId);
1210    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
1211    TimelineEvent event = new TimelineEvent();
1212    event.setEventType(appEvent.toString());
1213    event.setTimestamp(System.currentTimeMillis());
1214    entity.addEvent(event);
1215    try {
1216      TimelinePutResponse response = timelineClient.putEntities(entity);
1217      processTimelineResponseErrors(response);
1218    } catch (YarnException | IOException | ClientHandlerException e) {
1219      LOG.error("App Attempt "
1220          + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
1221          + " event could not be published for "
1222          + appAttemptId.toString(), e);
1223    }
1224  }
1225
1226  private TimelinePutResponse processTimelineResponseErrors(
1227      TimelinePutResponse response) {
1228    List<TimelinePutResponse.TimelinePutError> errors = response.getErrors();
1229    if (errors.size() == 0) {
1230      LOG.debug("Timeline entities are successfully put");
1231    } else {
1232      for (TimelinePutResponse.TimelinePutError error : errors) {
1233        LOG.error(
1234            "Error when publishing entity [" + error.getEntityType() + ","
1235                + error.getEntityId() + "], server side error code: "
1236                + error.getErrorCode());
1237      }
1238    }
1239    return response;
1240  }
1241
1242  RMCallbackHandler getRMCallbackHandler() {
1243    return new RMCallbackHandler();
1244  }
1245
1246  @VisibleForTesting
1247  void setAmRMClient(AMRMClientAsync client) {
1248    this.amRMClient = client;
1249  }
1250
1251  @VisibleForTesting
1252  int getNumCompletedContainers() {
1253    return numCompletedContainers.get();
1254  }
1255
1256  @VisibleForTesting
1257  boolean getDone() {
1258    return done;
1259  }
1260
1261  @VisibleForTesting
1262  Thread createLaunchContainerThread(Container allocatedContainer,
1263      String shellId) {
1264    LaunchContainerRunnable runnableLaunchContainer =
1265        new LaunchContainerRunnable(allocatedContainer, containerListener,
1266            shellId);
1267    return new Thread(runnableLaunchContainer);
1268  }
1269}