001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.applications.distributedshell;
020
021import java.io.BufferedReader;
022import java.io.DataInputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.IOException;
026import java.io.StringReader;
027import java.lang.reflect.UndeclaredThrowableException;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.nio.ByteBuffer;
031import java.security.PrivilegedExceptionAction;
032import java.util.ArrayList;
033import java.util.Collections;
034import java.util.HashMap;
035import java.util.HashSet;
036import java.util.Iterator;
037import java.util.List;
038import java.util.Map;
039import java.util.Set;
040import java.util.Vector;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ConcurrentMap;
043import java.util.concurrent.atomic.AtomicInteger;
044
045import org.apache.commons.cli.CommandLine;
046import org.apache.commons.cli.GnuParser;
047import org.apache.commons.cli.HelpFormatter;
048import org.apache.commons.cli.Options;
049import org.apache.commons.cli.ParseException;
050import org.apache.commons.logging.Log;
051import org.apache.commons.logging.LogFactory;
052import org.apache.hadoop.classification.InterfaceAudience;
053import org.apache.hadoop.classification.InterfaceAudience.Private;
054import org.apache.hadoop.classification.InterfaceStability;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.FileSystem;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.io.DataOutputBuffer;
059import org.apache.hadoop.io.IOUtils;
060import org.apache.hadoop.net.NetUtils;
061import org.apache.hadoop.security.Credentials;
062import org.apache.hadoop.security.UserGroupInformation;
063import org.apache.hadoop.security.token.Token;
064import org.apache.hadoop.util.ExitUtil;
065import org.apache.hadoop.util.Shell;
066import org.apache.hadoop.yarn.api.ApplicationConstants;
067import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
068import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
069import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
070import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
071import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
072import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
073import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
074import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
075import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
076import org.apache.hadoop.yarn.api.records.Container;
077import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
078import org.apache.hadoop.yarn.api.records.ContainerId;
079import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
080import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
081import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
082import org.apache.hadoop.yarn.api.records.ContainerState;
083import org.apache.hadoop.yarn.api.records.ContainerStatus;
084import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
085import org.apache.hadoop.yarn.api.records.LocalResource;
086import org.apache.hadoop.yarn.api.records.LocalResourceType;
087import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
088import org.apache.hadoop.yarn.api.records.NodeReport;
089import org.apache.hadoop.yarn.api.records.Priority;
090import org.apache.hadoop.yarn.api.records.Resource;
091import org.apache.hadoop.yarn.api.records.ResourceRequest;
092import org.apache.hadoop.yarn.api.records.URL;
093import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
094import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
095import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
096import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
097import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
098import org.apache.hadoop.yarn.client.api.TimelineClient;
099import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
100import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
101import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
102import org.apache.hadoop.yarn.conf.YarnConfiguration;
103import org.apache.hadoop.yarn.exceptions.YarnException;
104import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
105import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
106import org.apache.log4j.LogManager;
107
108import com.google.common.annotations.VisibleForTesting;
109import com.sun.jersey.api.client.ClientHandlerException;
110
111/**
112 * An ApplicationMaster for executing shell commands on a set of launched
113 * containers using the YARN framework.
114 * 
115 * <p>
116 * This class is meant to act as an example on how to write yarn-based
117 * application masters.
118 * </p>
119 * 
120 * <p>
121 * The ApplicationMaster is started on a container by the
122 * <code>ResourceManager</code>'s launcher. The first thing that the
123 * <code>ApplicationMaster</code> needs to do is to connect and register itself
124 * with the <code>ResourceManager</code>. The registration sets up information
125 * within the <code>ResourceManager</code> regarding what host:port the
126 * ApplicationMaster is listening on to provide any form of functionality to a
127 * client as well as a tracking url that a client can use to keep track of
128 * status/job history if needed. However, in the distributedshell, trackingurl
129 * and appMasterHost:appMasterRpcPort are not supported.
130 * </p>
131 * 
132 * <p>
133 * The <code>ApplicationMaster</code> needs to send a heartbeat to the
134 * <code>ResourceManager</code> at regular intervals to inform the
135 * <code>ResourceManager</code> that it is up and alive. The
136 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
137 * <code>ApplicationMaster</code> acts as a heartbeat.
138 * 
139 * <p>
140 * For the actual handling of the job, the <code>ApplicationMaster</code> has to
141 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
142 * required no. of containers using {@link ResourceRequest} with the necessary
143 * resource specifications such as node location, computational
144 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
145 * responds with an {@link AllocateResponse} that informs the
146 * <code>ApplicationMaster</code> of the set of newly allocated containers,
147 * completed containers as well as current state of available resources.
148 * </p>
149 * 
150 * <p>
151 * For each allocated container, the <code>ApplicationMaster</code> can then set
152 * up the necessary launch context via {@link ContainerLaunchContext} to specify
153 * the allocated container id, local resources required by the executable, the
154 * environment to be setup for the executable, commands to execute, etc. and
155 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
156 * launch and execute the defined commands on the given allocated container.
157 * </p>
158 * 
159 * <p>
160 * The <code>ApplicationMaster</code> can monitor the launched container by
161 * either querying the <code>ResourceManager</code> using
162 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
163 * the {@link ContainerManagementProtocol} by querying for the status of the allocated
164 * container's {@link ContainerId}.
165 *
166 * <p>
167 * After the job has been completed, the <code>ApplicationMaster</code> has to
168 * send a {@link FinishApplicationMasterRequest} to the
169 * <code>ResourceManager</code> to inform it that the
170 * <code>ApplicationMaster</code> has been completed.
171 */
172@InterfaceAudience.Public
173@InterfaceStability.Unstable
174public class ApplicationMaster {
175
176  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
177
178  @VisibleForTesting
179  @Private
180  public static enum DSEvent {
181    DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
182  }
183  
184  @VisibleForTesting
185  @Private
186  public static enum DSEntity {
187    DS_APP_ATTEMPT, DS_CONTAINER
188  }
189
190  private static final String YARN_SHELL_ID = "YARN_SHELL_ID";
191
192  // Configuration
193  private Configuration conf;
194
195  // Handle to communicate with the Resource Manager
196  @SuppressWarnings("rawtypes")
197  private AMRMClientAsync amRMClient;
198
199  // In both secure and non-secure modes, this points to the job-submitter.
200  @VisibleForTesting
201  UserGroupInformation appSubmitterUgi;
202
203  // Handle to communicate with the Node Manager
204  private NMClientAsync nmClientAsync;
205  // Listen to process the response from the Node Manager
206  private NMCallbackHandler containerListener;
207
208  // Application Attempt Id ( combination of attemptId and fail count )
209  @VisibleForTesting
210  protected ApplicationAttemptId appAttemptID;
211
212  // TODO
213  // For status update for clients - yet to be implemented
214  // Hostname of the container
215  private String appMasterHostname = "";
216  // Port on which the app master listens for status updates from clients
217  private int appMasterRpcPort = -1;
218  // Tracking url to which app master publishes info for clients to monitor
219  private String appMasterTrackingUrl = "";
220
221  private boolean timelineServiceV2 = false;
222
223  // App Master configuration
224  // No. of containers to run shell command on
225  @VisibleForTesting
226  protected int numTotalContainers = 1;
227  // Memory to request for the container on which the shell command will run
228  private long containerMemory = 10;
229  // VirtualCores to request for the container on which the shell command will run
230  private int containerVirtualCores = 1;
231  // Priority of the request
232  private int requestPriority;
233
234  // Counter for completed containers ( complete denotes successful or failed )
235  private AtomicInteger numCompletedContainers = new AtomicInteger();
236  // Allocated container count so that we know how many containers has the RM
237  // allocated to us
238  @VisibleForTesting
239  protected AtomicInteger numAllocatedContainers = new AtomicInteger();
240  // Count of failed containers
241  private AtomicInteger numFailedContainers = new AtomicInteger();
242  // Count of containers already requested from the RM
243  // Needed as once requested, we should not request for containers again.
244  // Only request for more if the original requirement changes.
245  @VisibleForTesting
246  protected AtomicInteger numRequestedContainers = new AtomicInteger();
247
248  // Shell command to be executed
249  private String shellCommand = "";
250  // Args to be passed to the shell command
251  private String shellArgs = "";
252  // Env variables to be setup for the shell command
253  private Map<String, String> shellEnv = new HashMap<String, String>();
254
255  // Location of shell script ( obtained from info set in env )
256  // Shell script path in fs
257  private String scriptPath = "";
258  // Timestamp needed for creating a local resource
259  private long shellScriptPathTimestamp = 0;
260  // File length needed for local resource
261  private long shellScriptPathLen = 0;
262
263  // Container retry options
264  private ContainerRetryPolicy containerRetryPolicy =
265      ContainerRetryPolicy.NEVER_RETRY;
266  private Set<Integer> containerRetryErrorCodes = null;
267  private int containerMaxRetries = 0;
268  private int containrRetryInterval = 0;
269
270  // Timeline domain ID
271  private String domainId = null;
272
273  // Hardcoded path to shell script in launch container's local env
274  private static final String EXEC_SHELL_STRING_PATH = Client.SCRIPT_PATH
275      + ".sh";
276  private static final String EXEC_BAT_SCRIPT_STRING_PATH = Client.SCRIPT_PATH
277      + ".bat";
278
279  // Hardcoded path to custom log_properties
280  private static final String log4jPath = "log4j.properties";
281
282  private static final String shellCommandPath = "shellCommands";
283  private static final String shellArgsPath = "shellArgs";
284
285  private volatile boolean done;
286
287  private ByteBuffer allTokens;
288
289  // Launch threads
290  private List<Thread> launchThreads = new ArrayList<Thread>();
291
292  // Timeline Client
293  @VisibleForTesting
294  TimelineClient timelineClient;
295  static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
296  static final String APPID_TIMELINE_FILTER_NAME = "appId";
297  static final String USER_TIMELINE_FILTER_NAME = "user";
298
299  private final String linux_bash_command = "bash";
300  private final String windows_command = "cmd /c";
301
302  private int yarnShellIdCounter = 1;
303
304  @VisibleForTesting
305  protected final Set<ContainerId> launchedContainers =
306      Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
307
308  /**
309   * @param args Command line args
310   */
311  public static void main(String[] args) {
312    boolean result = false;
313    try {
314      ApplicationMaster appMaster = new ApplicationMaster();
315      LOG.info("Initializing ApplicationMaster");
316      boolean doRun = appMaster.init(args);
317      if (!doRun) {
318        System.exit(0);
319      }
320      appMaster.run();
321      result = appMaster.finish();
322    } catch (Throwable t) {
323      LOG.fatal("Error running ApplicationMaster", t);
324      LogManager.shutdown();
325      ExitUtil.terminate(1, t);
326    }
327    if (result) {
328      LOG.info("Application Master completed successfully. exiting");
329      System.exit(0);
330    } else {
331      LOG.info("Application Master failed. exiting");
332      System.exit(2);
333    }
334  }
335
336  /**
337   * Dump out contents of $CWD and the environment to stdout for debugging
338   */
339  private void dumpOutDebugInfo() {
340
341    LOG.info("Dump debug output");
342    Map<String, String> envs = System.getenv();
343    for (Map.Entry<String, String> env : envs.entrySet()) {
344      LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
345      System.out.println("System env: key=" + env.getKey() + ", val="
346          + env.getValue());
347    }
348
349    BufferedReader buf = null;
350    try {
351      String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
352        Shell.execCommand("ls", "-al");
353      buf = new BufferedReader(new StringReader(lines));
354      String line = "";
355      while ((line = buf.readLine()) != null) {
356        LOG.info("System CWD content: " + line);
357        System.out.println("System CWD content: " + line);
358      }
359    } catch (IOException e) {
360      e.printStackTrace();
361    } finally {
362      IOUtils.cleanup(LOG, buf);
363    }
364  }
365
366  public ApplicationMaster() {
367    // Set up the configuration
368    conf = new YarnConfiguration();
369  }
370
371  /**
372   * Parse command line options
373   *
374   * @param args Command line args
375   * @return Whether init successful and run should be invoked
376   * @throws ParseException
377   * @throws IOException
378   */
379  public boolean init(String[] args) throws ParseException, IOException {
380    Options opts = new Options();
381    opts.addOption("app_attempt_id", true,
382        "App Attempt ID. Not to be used unless for testing purposes");
383    opts.addOption("shell_env", true,
384        "Environment for shell script. Specified as env_key=env_val pairs");
385    opts.addOption("container_memory", true,
386        "Amount of memory in MB to be requested to run the shell command");
387    opts.addOption("container_vcores", true,
388        "Amount of virtual cores to be requested to run the shell command");
389    opts.addOption("num_containers", true,
390        "No. of containers on which the shell command needs to be executed");
391    opts.addOption("priority", true, "Application Priority. Default 0");
392    opts.addOption("container_retry_policy", true,
393        "Retry policy when container fails to run, "
394            + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, "
395            + "2: RETRY_ON_SPECIFIC_ERROR_CODES");
396    opts.addOption("container_retry_error_codes", true,
397        "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error "
398            + "codes is specified with this option, "
399            + "e.g. --container_retry_error_codes 1,2,3");
400    opts.addOption("container_max_retries", true,
401        "If container could retry, it specifies max retires");
402    opts.addOption("container_retry_interval", true,
403        "Interval between each retry, unit is milliseconds");
404    opts.addOption("debug", false, "Dump out debug information");
405
406    opts.addOption("help", false, "Print usage");
407    CommandLine cliParser = new GnuParser().parse(opts, args);
408
409    if (args.length == 0) {
410      printUsage(opts);
411      throw new IllegalArgumentException(
412          "No args specified for application master to initialize");
413    }
414
415    //Check whether customer log4j.properties file exists
416    if (fileExist(log4jPath)) {
417      try {
418        Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
419            log4jPath);
420      } catch (Exception e) {
421        LOG.warn("Can not set up custom log4j properties. " + e);
422      }
423    }
424
425    if (cliParser.hasOption("help")) {
426      printUsage(opts);
427      return false;
428    }
429
430    if (cliParser.hasOption("debug")) {
431      dumpOutDebugInfo();
432    }
433
434    Map<String, String> envs = System.getenv();
435
436    if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
437      if (cliParser.hasOption("app_attempt_id")) {
438        String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
439        appAttemptID = ApplicationAttemptId.fromString(appIdStr);
440      } else {
441        throw new IllegalArgumentException(
442            "Application Attempt Id not set in the environment");
443      }
444    } else {
445      ContainerId containerId = ContainerId.fromString(envs
446          .get(Environment.CONTAINER_ID.name()));
447      appAttemptID = containerId.getApplicationAttemptId();
448    }
449
450    if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
451      throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
452          + " not set in the environment");
453    }
454    if (!envs.containsKey(Environment.NM_HOST.name())) {
455      throw new RuntimeException(Environment.NM_HOST.name()
456          + " not set in the environment");
457    }
458    if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
459      throw new RuntimeException(Environment.NM_HTTP_PORT
460          + " not set in the environment");
461    }
462    if (!envs.containsKey(Environment.NM_PORT.name())) {
463      throw new RuntimeException(Environment.NM_PORT.name()
464          + " not set in the environment");
465    }
466
467    LOG.info("Application master for app" + ", appId="
468        + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
469        + appAttemptID.getApplicationId().getClusterTimestamp()
470        + ", attemptId=" + appAttemptID.getAttemptId());
471
472    if (!fileExist(shellCommandPath)
473        && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
474      throw new IllegalArgumentException(
475          "No shell command or shell script specified to be executed by application master");
476    }
477
478    if (fileExist(shellCommandPath)) {
479      shellCommand = readContent(shellCommandPath);
480    }
481
482    if (fileExist(shellArgsPath)) {
483      shellArgs = readContent(shellArgsPath);
484    }
485
486    if (cliParser.hasOption("shell_env")) {
487      String shellEnvs[] = cliParser.getOptionValues("shell_env");
488      for (String env : shellEnvs) {
489        env = env.trim();
490        int index = env.indexOf('=');
491        if (index == -1) {
492          shellEnv.put(env, "");
493          continue;
494        }
495        String key = env.substring(0, index);
496        String val = "";
497        if (index < (env.length() - 1)) {
498          val = env.substring(index + 1);
499        }
500        shellEnv.put(key, val);
501      }
502    }
503
504    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
505      scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
506
507      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
508        shellScriptPathTimestamp = Long.parseLong(envs
509            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
510      }
511      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
512        shellScriptPathLen = Long.parseLong(envs
513            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
514      }
515      if (!scriptPath.isEmpty()
516          && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
517        LOG.error("Illegal values in env for shell script path" + ", path="
518            + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
519            + shellScriptPathTimestamp);
520        throw new IllegalArgumentException(
521            "Illegal values in env for shell script path");
522      }
523    }
524
525    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
526      domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
527    }
528
529    containerMemory = Integer.parseInt(cliParser.getOptionValue(
530        "container_memory", "10"));
531    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
532        "container_vcores", "1"));
533    numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
534        "num_containers", "1"));
535    if (numTotalContainers == 0) {
536      throw new IllegalArgumentException(
537          "Cannot run distributed shell with no containers");
538    }
539    requestPriority = Integer.parseInt(cliParser
540        .getOptionValue("priority", "0"));
541
542    containerRetryPolicy = ContainerRetryPolicy.values()[
543        Integer.parseInt(cliParser.getOptionValue(
544            "container_retry_policy", "0"))];
545    if (cliParser.hasOption("container_retry_error_codes")) {
546      containerRetryErrorCodes = new HashSet<>();
547      for (String errorCode :
548          cliParser.getOptionValue("container_retry_error_codes").split(",")) {
549        containerRetryErrorCodes.add(Integer.parseInt(errorCode));
550      }
551    }
552    containerMaxRetries = Integer.parseInt(
553        cliParser.getOptionValue("container_max_retries", "0"));
554    containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
555        "container_retry_interval", "0"));
556
557    if (YarnConfiguration.timelineServiceEnabled(conf)) {
558      timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf);
559    } else {
560      timelineClient = null;
561      LOG.warn("Timeline service is not enabled");
562    }
563
564    return true;
565  }
566
567  /**
568   * Helper function to print usage
569   *
570   * @param opts Parsed command line options
571   */
572  private void printUsage(Options opts) {
573    new HelpFormatter().printHelp("ApplicationMaster", opts);
574  }
575
576  /**
577   * Main run function for the application master
578   *
579   * @throws YarnException
580   * @throws IOException
581   */
582  @SuppressWarnings({ "unchecked" })
583  public void run() throws YarnException, IOException, InterruptedException {
584    LOG.info("Starting ApplicationMaster");
585
586    // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
587    // are marked as LimitedPrivate
588    Credentials credentials =
589        UserGroupInformation.getCurrentUser().getCredentials();
590    DataOutputBuffer dob = new DataOutputBuffer();
591    credentials.writeTokenStorageToStream(dob);
592    // Now remove the AM->RM token so that containers cannot access it.
593    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
594    LOG.info("Executing with tokens:");
595    while (iter.hasNext()) {
596      Token<?> token = iter.next();
597      LOG.info(token);
598      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
599        iter.remove();
600      }
601    }
602    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
603
604    // Create appSubmitterUgi and add original tokens to it
605    String appSubmitterUserName =
606        System.getenv(ApplicationConstants.Environment.USER.name());
607    appSubmitterUgi =
608        UserGroupInformation.createRemoteUser(appSubmitterUserName);
609    appSubmitterUgi.addCredentials(credentials);
610
611    AMRMClientAsync.AbstractCallbackHandler allocListener =
612        new RMCallbackHandler();
613    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
614    amRMClient.init(conf);
615    amRMClient.start();
616
617    containerListener = createNMCallbackHandler();
618    nmClientAsync = new NMClientAsyncImpl(containerListener);
619    nmClientAsync.init(conf);
620    nmClientAsync.start();
621
622    startTimelineClient(conf);
623    if (timelineServiceV2) {
624      // need to bind timelineClient
625      amRMClient.registerTimelineClient(timelineClient);
626    }
627    if(timelineClient != null) {
628      if (timelineServiceV2) {
629        publishApplicationAttemptEventOnTimelineServiceV2(
630            DSEvent.DS_APP_ATTEMPT_START);
631      } else {
632        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
633            DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
634      }
635    }
636
637    // Setup local RPC Server to accept status requests directly from clients
638    // TODO need to setup a protocol for client to be able to communicate to
639    // the RPC server
640    // TODO use the rpc port info to register with the RM for the client to
641    // send requests to this app master
642
643    // Register self with ResourceManager
644    // This will start heartbeating to the RM
645    appMasterHostname = NetUtils.getHostname();
646    RegisterApplicationMasterResponse response = amRMClient
647        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
648            appMasterTrackingUrl);
649    // Dump out information about cluster capability as seen by the
650    // resource manager
651    long maxMem = response.getMaximumResourceCapability().getMemorySize();
652    LOG.info("Max mem capability of resources in this cluster " + maxMem);
653    
654    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
655    LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
656
657    // A resource ask cannot exceed the max.
658    if (containerMemory > maxMem) {
659      LOG.info("Container memory specified above max threshold of cluster."
660          + " Using max value." + ", specified=" + containerMemory + ", max="
661          + maxMem);
662      containerMemory = maxMem;
663    }
664
665    if (containerVirtualCores > maxVCores) {
666      LOG.info("Container virtual cores specified above max threshold of cluster."
667          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
668          + maxVCores);
669      containerVirtualCores = maxVCores;
670    }
671
672    List<Container> previousAMRunningContainers =
673        response.getContainersFromPreviousAttempts();
674    LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
675      + " previous attempts' running containers on AM registration.");
676    for(Container container: previousAMRunningContainers) {
677      launchedContainers.add(container.getId());
678    }
679    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
680
681
682    int numTotalContainersToRequest =
683        numTotalContainers - previousAMRunningContainers.size();
684    // Setup ask for containers from RM
685    // Send request for containers to RM
686    // Until we get our fully allocated quota, we keep on polling RM for
687    // containers
688    // Keep looping until all the containers are launched and shell script
689    // executed on them ( regardless of success/failure).
690    for (int i = 0; i < numTotalContainersToRequest; ++i) {
691      ContainerRequest containerAsk = setupContainerAskForRM();
692      amRMClient.addContainerRequest(containerAsk);
693    }
694    numRequestedContainers.set(numTotalContainers);
695  }
696
697  @VisibleForTesting
698  void startTimelineClient(final Configuration conf)
699      throws YarnException, IOException, InterruptedException {
700    try {
701      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
702        @Override
703        public Void run() throws Exception {
704          if (YarnConfiguration.timelineServiceEnabled(conf)) {
705            // Creating the Timeline Client
706            if (timelineServiceV2) {
707              timelineClient = TimelineClient.createTimelineClient(
708                  appAttemptID.getApplicationId());
709              LOG.info("Timeline service V2 client is enabled");
710            } else {
711              timelineClient = TimelineClient.createTimelineClient();
712              LOG.info("Timeline service V1 client is enabled");
713            }
714            timelineClient.init(conf);
715            timelineClient.start();
716          } else {
717            timelineClient = null;
718            LOG.warn("Timeline service is not enabled");
719          }
720          return null;
721        }
722      });
723    } catch (UndeclaredThrowableException e) {
724      throw new YarnException(e.getCause());
725    }
726  }
727
728  @VisibleForTesting
729  NMCallbackHandler createNMCallbackHandler() {
730    return new NMCallbackHandler(this);
731  }
732
733  @VisibleForTesting
734  protected boolean finish() {
735    // wait for completion.
736    while (!done
737        && (numCompletedContainers.get() != numTotalContainers)) {
738      try {
739        Thread.sleep(200);
740      } catch (InterruptedException ex) {}
741    }
742
743    if (timelineClient != null) {
744      if (timelineServiceV2) {
745        publishApplicationAttemptEventOnTimelineServiceV2(
746            DSEvent.DS_APP_ATTEMPT_END);
747      } else {
748        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
749            DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
750      }
751    }
752
753    // Join all launched threads
754    // needed for when we time out
755    // and we need to release containers
756    for (Thread launchThread : launchThreads) {
757      try {
758        launchThread.join(10000);
759      } catch (InterruptedException e) {
760        LOG.info("Exception thrown in thread join: " + e.getMessage());
761        e.printStackTrace();
762      }
763    }
764
765    // When the application completes, it should stop all running containers
766    LOG.info("Application completed. Stopping running containers");
767    nmClientAsync.stop();
768
769    // When the application completes, it should send a finish application
770    // signal to the RM
771    LOG.info("Application completed. Signalling finish to RM");
772
773    FinalApplicationStatus appStatus;
774    String appMessage = null;
775    boolean success = true;
776    if (numCompletedContainers.get() - numFailedContainers.get()
777        >= numTotalContainers) {
778      appStatus = FinalApplicationStatus.SUCCEEDED;
779    } else {
780      appStatus = FinalApplicationStatus.FAILED;
781      appMessage = "Diagnostics." + ", total=" + numTotalContainers
782          + ", completed=" + numCompletedContainers.get() + ", allocated="
783          + numAllocatedContainers.get() + ", failed="
784          + numFailedContainers.get();
785      LOG.info(appMessage);
786      success = false;
787    }
788    try {
789      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
790    } catch (YarnException ex) {
791      LOG.error("Failed to unregister application", ex);
792    } catch (IOException e) {
793      LOG.error("Failed to unregister application", e);
794    }
795    
796    amRMClient.stop();
797
798    // Stop Timeline Client
799    if(timelineClient != null) {
800      timelineClient.stop();
801    }
802
803    return success;
804  }
805
806  @VisibleForTesting
807  class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
808    @SuppressWarnings("unchecked")
809    @Override
810    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
811      LOG.info("Got response from RM for container ask, completedCnt="
812          + completedContainers.size());
813      for (ContainerStatus containerStatus : completedContainers) {
814        LOG.info(appAttemptID + " got container status for containerID="
815            + containerStatus.getContainerId() + ", state="
816            + containerStatus.getState() + ", exitStatus="
817            + containerStatus.getExitStatus() + ", diagnostics="
818            + containerStatus.getDiagnostics());
819
820        // non complete containers should not be here
821        assert (containerStatus.getState() == ContainerState.COMPLETE);
822        // ignore containers we know nothing about - probably from a previous
823        // attempt
824        if (!launchedContainers.contains(containerStatus.getContainerId())) {
825          LOG.info("Ignoring completed status of "
826              + containerStatus.getContainerId()
827              + "; unknown container(probably launched by previous attempt)");
828          continue;
829        }
830
831        // increment counters for completed/failed containers
832        int exitStatus = containerStatus.getExitStatus();
833        if (0 != exitStatus) {
834          // container failed
835          if (ContainerExitStatus.ABORTED != exitStatus) {
836            // shell script failed
837            // counts as completed
838            numCompletedContainers.incrementAndGet();
839            numFailedContainers.incrementAndGet();
840          } else {
841            // container was killed by framework, possibly preempted
842            // we should re-try as the container was lost for some reason
843            numAllocatedContainers.decrementAndGet();
844            numRequestedContainers.decrementAndGet();
845            // we do not need to release the container as it would be done
846            // by the RM
847          }
848        } else {
849          // nothing to do
850          // container completed successfully
851          numCompletedContainers.incrementAndGet();
852          LOG.info("Container completed successfully." + ", containerId="
853              + containerStatus.getContainerId());
854        }
855        if(timelineClient != null) {
856          if (timelineServiceV2) {
857            publishContainerEndEventOnTimelineServiceV2(containerStatus);
858          } else {
859            publishContainerEndEvent(
860                timelineClient, containerStatus, domainId, appSubmitterUgi);
861          }
862        }
863      }
864      
865      // ask for more containers if any failed
866      int askCount = numTotalContainers - numRequestedContainers.get();
867      numRequestedContainers.addAndGet(askCount);
868
869      if (askCount > 0) {
870        for (int i = 0; i < askCount; ++i) {
871          ContainerRequest containerAsk = setupContainerAskForRM();
872          amRMClient.addContainerRequest(containerAsk);
873        }
874      }
875      
876      if (numCompletedContainers.get() == numTotalContainers) {
877        done = true;
878      }
879    }
880
881    @Override
882    public void onContainersAllocated(List<Container> allocatedContainers) {
883      LOG.info("Got response from RM for container ask, allocatedCnt="
884          + allocatedContainers.size());
885      numAllocatedContainers.addAndGet(allocatedContainers.size());
886      for (Container allocatedContainer : allocatedContainers) {
887        String yarnShellId = Integer.toString(yarnShellIdCounter);
888        yarnShellIdCounter++;
889        LOG.info("Launching shell command on a new container."
890            + ", containerId=" + allocatedContainer.getId()
891            + ", yarnShellId=" + yarnShellId
892            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
893            + ":" + allocatedContainer.getNodeId().getPort()
894            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
895            + ", containerResourceMemory"
896            + allocatedContainer.getResource().getMemorySize()
897            + ", containerResourceVirtualCores"
898            + allocatedContainer.getResource().getVirtualCores());
899        // + ", containerToken"
900        // +allocatedContainer.getContainerToken().getIdentifier().toString());
901
902        Thread launchThread = createLaunchContainerThread(allocatedContainer,
903            yarnShellId);
904
905        // launch and start the container on a separate thread to keep
906        // the main thread unblocked
907        // as all containers may not be allocated at one go.
908        launchThreads.add(launchThread);
909        launchedContainers.add(allocatedContainer.getId());
910        launchThread.start();
911      }
912    }
913
914    @Override
915    public void onContainersResourceChanged(List<Container> containers) {}
916
917    @Override
918    public void onShutdownRequest() {
919      done = true;
920    }
921
922    @Override
923    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
924
925    @Override
926    public float getProgress() {
927      // set progress to deliver to RM on next heartbeat
928      float progress = (float) numCompletedContainers.get()
929          / numTotalContainers;
930      return progress;
931    }
932
933    @Override
934    public void onError(Throwable e) {
935      LOG.error("Error in RMCallbackHandler: ", e);
936      done = true;
937      amRMClient.stop();
938    }
939  }
940
941  @VisibleForTesting
942  static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
943
944    private ConcurrentMap<ContainerId, Container> containers =
945        new ConcurrentHashMap<ContainerId, Container>();
946    private final ApplicationMaster applicationMaster;
947
948    public NMCallbackHandler(ApplicationMaster applicationMaster) {
949      this.applicationMaster = applicationMaster;
950    }
951
952    public void addContainer(ContainerId containerId, Container container) {
953      containers.putIfAbsent(containerId, container);
954    }
955
956    @Override
957    public void onContainerStopped(ContainerId containerId) {
958      if (LOG.isDebugEnabled()) {
959        LOG.debug("Succeeded to stop Container " + containerId);
960      }
961      containers.remove(containerId);
962    }
963
964    @Override
965    public void onContainerStatusReceived(ContainerId containerId,
966        ContainerStatus containerStatus) {
967      if (LOG.isDebugEnabled()) {
968        LOG.debug("Container Status: id=" + containerId + ", status=" +
969            containerStatus);
970      }
971    }
972
973    @Override
974    public void onContainerStarted(ContainerId containerId,
975        Map<String, ByteBuffer> allServiceResponse) {
976      if (LOG.isDebugEnabled()) {
977        LOG.debug("Succeeded to start Container " + containerId);
978      }
979      Container container = containers.get(containerId);
980      if (container != null) {
981        applicationMaster.nmClientAsync.getContainerStatusAsync(
982            containerId, container.getNodeId());
983      }
984      if(applicationMaster.timelineClient != null) {
985        if (applicationMaster.timelineServiceV2) {
986          applicationMaster.publishContainerStartEventOnTimelineServiceV2(
987              container);
988        } else {
989          applicationMaster.publishContainerStartEvent(
990              applicationMaster.timelineClient, container,
991              applicationMaster.domainId, applicationMaster.appSubmitterUgi);
992        }
993      }
994    }
995
996    @Override
997    public void onContainerResourceIncreased(
998        ContainerId containerId, Resource resource) {}
999
1000    @Override
1001    public void onStartContainerError(ContainerId containerId, Throwable t) {
1002      LOG.error("Failed to start Container " + containerId);
1003      containers.remove(containerId);
1004      applicationMaster.numCompletedContainers.incrementAndGet();
1005      applicationMaster.numFailedContainers.incrementAndGet();
1006    }
1007
1008    @Override
1009    public void onGetContainerStatusError(
1010        ContainerId containerId, Throwable t) {
1011      LOG.error("Failed to query the status of Container " + containerId);
1012    }
1013
1014    @Override
1015    public void onStopContainerError(ContainerId containerId, Throwable t) {
1016      LOG.error("Failed to stop Container " + containerId);
1017      containers.remove(containerId);
1018    }
1019
1020    @Override
1021    public void onIncreaseContainerResourceError(
1022        ContainerId containerId, Throwable t) {}
1023
1024  }
1025
1026  /**
1027   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
1028   * that will execute the shell command.
1029   */
1030  private class LaunchContainerRunnable implements Runnable {
1031
1032    // Allocated container
1033    private Container container;
1034    private String shellId;
1035
1036    NMCallbackHandler containerListener;
1037
1038    /**
1039     * @param lcontainer Allocated container
1040     * @param containerListener Callback handler of the container
1041     */
1042    public LaunchContainerRunnable(Container lcontainer,
1043        NMCallbackHandler containerListener, String shellId) {
1044      this.container = lcontainer;
1045      this.containerListener = containerListener;
1046      this.shellId = shellId;
1047    }
1048
1049    @Override
1050    /**
1051     * Connects to CM, sets up container launch context 
1052     * for shell command and eventually dispatches the container 
1053     * start request to the CM. 
1054     */
1055    public void run() {
1056      LOG.info("Setting up container launch container for containerid="
1057          + container.getId() + " with shellid=" + shellId);
1058
1059      // Set the local resources
1060      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
1061
1062      // The container for the eventual shell commands needs its own local
1063      // resources too.
1064      // In this scenario, if a shell script is specified, we need to have it
1065      // copied and made available to the container.
1066      if (!scriptPath.isEmpty()) {
1067        Path renamedScriptPath = null;
1068        if (Shell.WINDOWS) {
1069          renamedScriptPath = new Path(scriptPath + ".bat");
1070        } else {
1071          renamedScriptPath = new Path(scriptPath + ".sh");
1072        }
1073
1074        try {
1075          // rename the script file based on the underlying OS syntax.
1076          renameScriptFile(renamedScriptPath);
1077        } catch (Exception e) {
1078          LOG.error(
1079              "Not able to add suffix (.bat/.sh) to the shell script filename",
1080              e);
1081          // We know we cannot continue launching the container
1082          // so we should release it.
1083          numCompletedContainers.incrementAndGet();
1084          numFailedContainers.incrementAndGet();
1085          return;
1086        }
1087
1088        URL yarnUrl = null;
1089        try {
1090          yarnUrl = URL.fromURI(new URI(renamedScriptPath.toString()));
1091        } catch (URISyntaxException e) {
1092          LOG.error("Error when trying to use shell script path specified"
1093              + " in env, path=" + renamedScriptPath, e);
1094          // A failure scenario on bad input such as invalid shell script path
1095          // We know we cannot continue launching the container
1096          // so we should release it.
1097          // TODO
1098          numCompletedContainers.incrementAndGet();
1099          numFailedContainers.incrementAndGet();
1100          return;
1101        }
1102        LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
1103          LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
1104          shellScriptPathLen, shellScriptPathTimestamp);
1105        localResources.put(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH :
1106            EXEC_SHELL_STRING_PATH, shellRsrc);
1107        shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
1108      }
1109
1110      // Set the necessary command to execute on the allocated container
1111      Vector<CharSequence> vargs = new Vector<CharSequence>(5);
1112
1113      // Set executable command
1114      vargs.add(shellCommand);
1115      // Set shell script path
1116      if (!scriptPath.isEmpty()) {
1117        vargs.add(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH
1118            : EXEC_SHELL_STRING_PATH);
1119      }
1120
1121      // Set args for the shell command if any
1122      vargs.add(shellArgs);
1123      // Add log redirect params
1124      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
1125      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
1126
1127      // Get final commmand
1128      StringBuilder command = new StringBuilder();
1129      for (CharSequence str : vargs) {
1130        command.append(str).append(" ");
1131      }
1132
1133      List<String> commands = new ArrayList<String>();
1134      commands.add(command.toString());
1135
1136      // Set up ContainerLaunchContext, setting local resource, environment,
1137      // command and token for constructor.
1138
1139      // Note for tokens: Set up tokens for the container too. Today, for normal
1140      // shell commands, the container in distribute-shell doesn't need any
1141      // tokens. We are populating them mainly for NodeManagers to be able to
1142      // download anyfiles in the distributed file-system. The tokens are
1143      // otherwise also useful in cases, for e.g., when one is running a
1144      // "hadoop dfs" command inside the distributed shell.
1145      Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv);
1146      myShellEnv.put(YARN_SHELL_ID, shellId);
1147      ContainerRetryContext containerRetryContext =
1148          ContainerRetryContext.newInstance(
1149              containerRetryPolicy, containerRetryErrorCodes,
1150              containerMaxRetries, containrRetryInterval);
1151      ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
1152        localResources, myShellEnv, commands, null, allTokens.duplicate(),
1153          null, containerRetryContext);
1154      containerListener.addContainer(container.getId(), container);
1155      nmClientAsync.startContainerAsync(container, ctx);
1156    }
1157  }
1158
1159  private void renameScriptFile(final Path renamedScriptPath)
1160      throws IOException, InterruptedException {
1161    appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
1162      @Override
1163      public Void run() throws IOException {
1164        FileSystem fs = renamedScriptPath.getFileSystem(conf);
1165        fs.rename(new Path(scriptPath), renamedScriptPath);
1166        return null;
1167      }
1168    });
1169    LOG.info("User " + appSubmitterUgi.getUserName()
1170        + " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
1171  }
1172
1173  /**
1174   * Setup the request that will be sent to the RM for the container ask.
1175   *
1176   * @return the setup ResourceRequest to be sent to RM
1177   */
1178  private ContainerRequest setupContainerAskForRM() {
1179    // setup requirements for hosts
1180    // using * as any host will do for the distributed shell app
1181    // set the priority for the request
1182    // TODO - what is the range for priority? how to decide?
1183    Priority pri = Priority.newInstance(requestPriority);
1184
1185    // Set up resource type requirements
1186    // For now, memory and CPU are supported so we set memory and cpu requirements
1187    Resource capability = Resource.newInstance(containerMemory,
1188      containerVirtualCores);
1189
1190    ContainerRequest request = new ContainerRequest(capability, null, null,
1191        pri);
1192    LOG.info("Requested container ask: " + request.toString());
1193    return request;
1194  }
1195
1196  private boolean fileExist(String filePath) {
1197    return new File(filePath).exists();
1198  }
1199
1200  private String readContent(String filePath) throws IOException {
1201    DataInputStream ds = null;
1202    try {
1203      ds = new DataInputStream(new FileInputStream(filePath));
1204      return ds.readUTF();
1205    } finally {
1206      org.apache.commons.io.IOUtils.closeQuietly(ds);
1207    }
1208  }
1209
1210  private void publishContainerStartEvent(
1211      final TimelineClient timelineClient, final Container container,
1212      String domainId, UserGroupInformation ugi) {
1213    final TimelineEntity entity = new TimelineEntity();
1214    entity.setEntityId(container.getId().toString());
1215    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1216    entity.setDomainId(domainId);
1217    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
1218    entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getId()
1219        .getApplicationAttemptId().getApplicationId().toString());
1220    TimelineEvent event = new TimelineEvent();
1221    event.setTimestamp(System.currentTimeMillis());
1222    event.setEventType(DSEvent.DS_CONTAINER_START.toString());
1223    event.addEventInfo("Node", container.getNodeId().toString());
1224    event.addEventInfo("Resources", container.getResource().toString());
1225    entity.addEvent(event);
1226
1227    try {
1228      processTimelineResponseErrors(
1229          putContainerEntity(timelineClient,
1230              container.getId().getApplicationAttemptId(),
1231              entity));
1232    } catch (YarnException | IOException | ClientHandlerException e) {
1233      LOG.error("Container start event could not be published for "
1234          + container.getId().toString(), e);
1235    }
1236  }
1237
1238  @VisibleForTesting
1239  void publishContainerEndEvent(
1240      final TimelineClient timelineClient, ContainerStatus container,
1241      String domainId, UserGroupInformation ugi) {
1242    final TimelineEntity entity = new TimelineEntity();
1243    entity.setEntityId(container.getContainerId().toString());
1244    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1245    entity.setDomainId(domainId);
1246    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
1247    entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME,
1248        container.getContainerId().getApplicationAttemptId()
1249            .getApplicationId().toString());
1250    TimelineEvent event = new TimelineEvent();
1251    event.setTimestamp(System.currentTimeMillis());
1252    event.setEventType(DSEvent.DS_CONTAINER_END.toString());
1253    event.addEventInfo("State", container.getState().name());
1254    event.addEventInfo("Exit Status", container.getExitStatus());
1255    entity.addEvent(event);
1256    try {
1257      processTimelineResponseErrors(
1258          putContainerEntity(timelineClient,
1259              container.getContainerId().getApplicationAttemptId(),
1260              entity));
1261    } catch (YarnException | IOException | ClientHandlerException e) {
1262      LOG.error("Container end event could not be published for "
1263          + container.getContainerId().toString(), e);
1264    }
1265  }
1266
1267  private TimelinePutResponse putContainerEntity(
1268      TimelineClient timelineClient, ApplicationAttemptId currAttemptId,
1269      TimelineEntity entity)
1270      throws YarnException, IOException {
1271    if (TimelineUtils.timelineServiceV1_5Enabled(conf)) {
1272      TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance(
1273          currAttemptId.getApplicationId(),
1274          CONTAINER_ENTITY_GROUP_ID);
1275      return timelineClient.putEntities(currAttemptId, groupId, entity);
1276    } else {
1277      return timelineClient.putEntities(entity);
1278    }
1279  }
1280
1281  private void publishApplicationAttemptEvent(
1282      final TimelineClient timelineClient, String appAttemptId,
1283      DSEvent appEvent, String domainId, UserGroupInformation ugi) {
1284    final TimelineEntity entity = new TimelineEntity();
1285    entity.setEntityId(appAttemptId);
1286    entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
1287    entity.setDomainId(domainId);
1288    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
1289    TimelineEvent event = new TimelineEvent();
1290    event.setEventType(appEvent.toString());
1291    event.setTimestamp(System.currentTimeMillis());
1292    entity.addEvent(event);
1293    try {
1294      TimelinePutResponse response = timelineClient.putEntities(entity);
1295      processTimelineResponseErrors(response);
1296    } catch (YarnException | IOException | ClientHandlerException e) {
1297      LOG.error("App Attempt "
1298          + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
1299          + " event could not be published for "
1300          + appAttemptID, e);
1301    }
1302  }
1303
1304  private TimelinePutResponse processTimelineResponseErrors(
1305      TimelinePutResponse response) {
1306    List<TimelinePutResponse.TimelinePutError> errors = response.getErrors();
1307    if (errors.size() == 0) {
1308      LOG.debug("Timeline entities are successfully put");
1309    } else {
1310      for (TimelinePutResponse.TimelinePutError error : errors) {
1311        LOG.error(
1312            "Error when publishing entity [" + error.getEntityType() + ","
1313                + error.getEntityId() + "], server side error code: "
1314                + error.getErrorCode());
1315      }
1316    }
1317    return response;
1318  }
1319
1320  RMCallbackHandler getRMCallbackHandler() {
1321    return new RMCallbackHandler();
1322  }
1323
1324  @VisibleForTesting
1325  void setAmRMClient(AMRMClientAsync client) {
1326    this.amRMClient = client;
1327  }
1328
1329  @VisibleForTesting
1330  int getNumCompletedContainers() {
1331    return numCompletedContainers.get();
1332  }
1333
1334  @VisibleForTesting
1335  boolean getDone() {
1336    return done;
1337  }
1338
1339  @VisibleForTesting
1340  Thread createLaunchContainerThread(Container allocatedContainer,
1341      String shellId) {
1342    LaunchContainerRunnable runnableLaunchContainer =
1343        new LaunchContainerRunnable(allocatedContainer, containerListener,
1344            shellId);
1345    return new Thread(runnableLaunchContainer);
1346  }
1347
1348  private void publishContainerStartEventOnTimelineServiceV2(
1349      Container container) {
1350    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
1351        entity =
1352            new org.apache.hadoop.yarn.api.records.timelineservice.
1353            TimelineEntity();
1354    entity.setId(container.getId().toString());
1355    entity.setType(DSEntity.DS_CONTAINER.toString());
1356    long ts = System.currentTimeMillis();
1357    entity.setCreatedTime(ts);
1358    entity.addInfo("user", appSubmitterUgi.getShortUserName());
1359
1360    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
1361        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
1362    event.setTimestamp(ts);
1363    event.setId(DSEvent.DS_CONTAINER_START.toString());
1364    event.addInfo("Node", container.getNodeId().toString());
1365    event.addInfo("Resources", container.getResource().toString());
1366    entity.addEvent(event);
1367
1368    try {
1369      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
1370        @Override
1371        public TimelinePutResponse run() throws Exception {
1372          timelineClient.putEntities(entity);
1373          return null;
1374        }
1375      });
1376    } catch (Exception e) {
1377      LOG.error("Container start event could not be published for "
1378          + container.getId().toString(),
1379          e instanceof UndeclaredThrowableException ? e.getCause() : e);
1380    }
1381  }
1382
1383  private void publishContainerEndEventOnTimelineServiceV2(
1384      final ContainerStatus container) {
1385    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
1386        entity =
1387            new org.apache.hadoop.yarn.api.records.timelineservice.
1388            TimelineEntity();
1389    entity.setId(container.getContainerId().toString());
1390    entity.setType(DSEntity.DS_CONTAINER.toString());
1391    //entity.setDomainId(domainId);
1392    entity.addInfo("user", appSubmitterUgi.getShortUserName());
1393    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
1394        new  org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
1395    event.setTimestamp(System.currentTimeMillis());
1396    event.setId(DSEvent.DS_CONTAINER_END.toString());
1397    event.addInfo("State", container.getState().name());
1398    event.addInfo("Exit Status", container.getExitStatus());
1399    entity.addEvent(event);
1400
1401    try {
1402      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
1403        @Override
1404        public TimelinePutResponse run() throws Exception {
1405          timelineClient.putEntities(entity);
1406          return null;
1407        }
1408      });
1409    } catch (Exception e) {
1410      LOG.error("Container end event could not be published for "
1411          + container.getContainerId().toString(),
1412          e instanceof UndeclaredThrowableException ? e.getCause() : e);
1413    }
1414  }
1415
1416  private void publishApplicationAttemptEventOnTimelineServiceV2(
1417      DSEvent appEvent) {
1418    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
1419        entity =
1420            new org.apache.hadoop.yarn.api.records.timelineservice.
1421            TimelineEntity();
1422    entity.setId(appAttemptID.toString());
1423    entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
1424    long ts = System.currentTimeMillis();
1425    if (appEvent == DSEvent.DS_APP_ATTEMPT_START) {
1426      entity.setCreatedTime(ts);
1427    }
1428    entity.addInfo("user", appSubmitterUgi.getShortUserName());
1429    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
1430        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
1431    event.setId(appEvent.toString());
1432    event.setTimestamp(ts);
1433    entity.addEvent(event);
1434
1435    try {
1436      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
1437        @Override
1438        public TimelinePutResponse run() throws Exception {
1439          timelineClient.putEntitiesAsync(entity);
1440          return null;
1441        }
1442      });
1443    } catch (Exception e) {
1444      LOG.error("App Attempt "
1445          + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
1446          + " event could not be published for "
1447          + appAttemptID,
1448          e instanceof UndeclaredThrowableException ? e.getCause() : e);
1449    }
1450  }
1451
1452}