001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.applications.distributedshell;
020
021import java.io.BufferedReader;
022import java.io.DataInputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.IOException;
026import java.io.StringReader;
027import java.lang.reflect.UndeclaredThrowableException;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.nio.ByteBuffer;
031import java.security.PrivilegedExceptionAction;
032import java.util.ArrayList;
033import java.util.HashMap;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Vector;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentMap;
040import java.util.concurrent.atomic.AtomicInteger;
041
042import org.apache.commons.cli.CommandLine;
043import org.apache.commons.cli.GnuParser;
044import org.apache.commons.cli.HelpFormatter;
045import org.apache.commons.cli.Options;
046import org.apache.commons.cli.ParseException;
047import org.apache.commons.logging.Log;
048import org.apache.commons.logging.LogFactory;
049import org.apache.hadoop.classification.InterfaceAudience;
050import org.apache.hadoop.classification.InterfaceAudience.Private;
051import org.apache.hadoop.classification.InterfaceStability;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.FileSystem;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.io.DataOutputBuffer;
056import org.apache.hadoop.io.IOUtils;
057import org.apache.hadoop.net.NetUtils;
058import org.apache.hadoop.security.Credentials;
059import org.apache.hadoop.security.UserGroupInformation;
060import org.apache.hadoop.security.token.Token;
061import org.apache.hadoop.util.ExitUtil;
062import org.apache.hadoop.util.Shell;
063import org.apache.hadoop.yarn.api.ApplicationConstants;
064import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
065import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
066import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
067import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
068import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
069import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
070import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
071import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
072import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
073import org.apache.hadoop.yarn.api.records.Container;
074import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
075import org.apache.hadoop.yarn.api.records.ContainerId;
076import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
077import org.apache.hadoop.yarn.api.records.ContainerState;
078import org.apache.hadoop.yarn.api.records.ContainerStatus;
079import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
080import org.apache.hadoop.yarn.api.records.LocalResource;
081import org.apache.hadoop.yarn.api.records.LocalResourceType;
082import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
083import org.apache.hadoop.yarn.api.records.NodeReport;
084import org.apache.hadoop.yarn.api.records.Priority;
085import org.apache.hadoop.yarn.api.records.Resource;
086import org.apache.hadoop.yarn.api.records.ResourceRequest;
087import org.apache.hadoop.yarn.api.records.URL;
088import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
089import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
090import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
091import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
092import org.apache.hadoop.yarn.client.api.TimelineClient;
093import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
094import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
095import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
096import org.apache.hadoop.yarn.conf.YarnConfiguration;
097import org.apache.hadoop.yarn.exceptions.YarnException;
098import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
099import org.apache.hadoop.yarn.util.ConverterUtils;
100import org.apache.log4j.LogManager;
101
102import com.google.common.annotations.VisibleForTesting;
103
104/**
105 * An ApplicationMaster for executing shell commands on a set of launched
106 * containers using the YARN framework.
107 * 
108 * <p>
109 * This class is meant to act as an example on how to write yarn-based
110 * application masters.
111 * </p>
112 * 
113 * <p>
114 * The ApplicationMaster is started on a container by the
115 * <code>ResourceManager</code>'s launcher. The first thing that the
116 * <code>ApplicationMaster</code> needs to do is to connect and register itself
117 * with the <code>ResourceManager</code>. The registration sets up information
118 * within the <code>ResourceManager</code> regarding what host:port the
119 * ApplicationMaster is listening on to provide any form of functionality to a
120 * client as well as a tracking url that a client can use to keep track of
121 * status/job history if needed. However, in the distributedshell, trackingurl
122 * and appMasterHost:appMasterRpcPort are not supported.
123 * </p>
124 * 
125 * <p>
126 * The <code>ApplicationMaster</code> needs to send a heartbeat to the
127 * <code>ResourceManager</code> at regular intervals to inform the
128 * <code>ResourceManager</code> that it is up and alive. The
129 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
130 * <code>ApplicationMaster</code> acts as a heartbeat.
131 * 
132 * <p>
133 * For the actual handling of the job, the <code>ApplicationMaster</code> has to
134 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
135 * required no. of containers using {@link ResourceRequest} with the necessary
136 * resource specifications such as node location, computational
137 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
138 * responds with an {@link AllocateResponse} that informs the
139 * <code>ApplicationMaster</code> of the set of newly allocated containers,
140 * completed containers as well as current state of available resources.
141 * </p>
142 * 
143 * <p>
144 * For each allocated container, the <code>ApplicationMaster</code> can then set
145 * up the necessary launch context via {@link ContainerLaunchContext} to specify
146 * the allocated container id, local resources required by the executable, the
147 * environment to be setup for the executable, commands to execute, etc. and
148 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
149 * launch and execute the defined commands on the given allocated container.
150 * </p>
151 * 
152 * <p>
153 * The <code>ApplicationMaster</code> can monitor the launched container by
154 * either querying the <code>ResourceManager</code> using
155 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
156 * the {@link ContainerManagementProtocol} by querying for the status of the allocated
157 * container's {@link ContainerId}.
158 *
159 * <p>
160 * After the job has been completed, the <code>ApplicationMaster</code> has to
161 * send a {@link FinishApplicationMasterRequest} to the
162 * <code>ResourceManager</code> to inform it that the
163 * <code>ApplicationMaster</code> has been completed.
164 */
165@InterfaceAudience.Public
166@InterfaceStability.Unstable
167public class ApplicationMaster {
168
169  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
170
171  @VisibleForTesting
172  @Private
173  public static enum DSEvent {
174    DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
175  }
176  
177  @VisibleForTesting
178  @Private
179  public static enum DSEntity {
180    DS_APP_ATTEMPT, DS_CONTAINER
181  }
182
183  // Configuration
184  private Configuration conf;
185
186  // Handle to communicate with the Resource Manager
187  @SuppressWarnings("rawtypes")
188  private AMRMClientAsync amRMClient;
189
190  // In both secure and non-secure modes, this points to the job-submitter.
191  @VisibleForTesting
192  UserGroupInformation appSubmitterUgi;
193
194  // Handle to communicate with the Node Manager
195  private NMClientAsync nmClientAsync;
196  // Listen to process the response from the Node Manager
197  private NMCallbackHandler containerListener;
198
199  // Application Attempt Id ( combination of attemptId and fail count )
200  @VisibleForTesting
201  protected ApplicationAttemptId appAttemptID;
202
203  // TODO
204  // For status update for clients - yet to be implemented
205  // Hostname of the container
206  private String appMasterHostname = "";
207  // Port on which the app master listens for status updates from clients
208  private int appMasterRpcPort = -1;
209  // Tracking url to which app master publishes info for clients to monitor
210  private String appMasterTrackingUrl = "";
211
212  // App Master configuration
213  // No. of containers to run shell command on
214  @VisibleForTesting
215  protected int numTotalContainers = 1;
216  // Memory to request for the container on which the shell command will run
217  private int containerMemory = 10;
218  // VirtualCores to request for the container on which the shell command will run
219  private int containerVirtualCores = 1;
220  // Priority of the request
221  private int requestPriority;
222
223  // Counter for completed containers ( complete denotes successful or failed )
224  private AtomicInteger numCompletedContainers = new AtomicInteger();
225  // Allocated container count so that we know how many containers has the RM
226  // allocated to us
227  @VisibleForTesting
228  protected AtomicInteger numAllocatedContainers = new AtomicInteger();
229  // Count of failed containers
230  private AtomicInteger numFailedContainers = new AtomicInteger();
231  // Count of containers already requested from the RM
232  // Needed as once requested, we should not request for containers again.
233  // Only request for more if the original requirement changes.
234  @VisibleForTesting
235  protected AtomicInteger numRequestedContainers = new AtomicInteger();
236
237  // Shell command to be executed
238  private String shellCommand = "";
239  // Args to be passed to the shell command
240  private String shellArgs = "";
241  // Env variables to be setup for the shell command
242  private Map<String, String> shellEnv = new HashMap<String, String>();
243
244  // Location of shell script ( obtained from info set in env )
245  // Shell script path in fs
246  private String scriptPath = "";
247  // Timestamp needed for creating a local resource
248  private long shellScriptPathTimestamp = 0;
249  // File length needed for local resource
250  private long shellScriptPathLen = 0;
251
252  // Timeline domain ID
253  private String domainId = null;
254
255  // Hardcoded path to shell script in launch container's local env
256  private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
257  private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
258      + ".bat";
259
260  // Hardcoded path to custom log_properties
261  private static final String log4jPath = "log4j.properties";
262
263  private static final String shellCommandPath = "shellCommands";
264  private static final String shellArgsPath = "shellArgs";
265
266  private volatile boolean done;
267
268  private ByteBuffer allTokens;
269
270  // Launch threads
271  private List<Thread> launchThreads = new ArrayList<Thread>();
272
273  // Timeline Client
274  @VisibleForTesting
275  TimelineClient timelineClient;
276
277  private final String linux_bash_command = "bash";
278  private final String windows_command = "cmd /c";
279
280  /**
281   * @param args Command line args
282   */
283  public static void main(String[] args) {
284    boolean result = false;
285    try {
286      ApplicationMaster appMaster = new ApplicationMaster();
287      LOG.info("Initializing ApplicationMaster");
288      boolean doRun = appMaster.init(args);
289      if (!doRun) {
290        System.exit(0);
291      }
292      appMaster.run();
293      result = appMaster.finish();
294    } catch (Throwable t) {
295      LOG.fatal("Error running ApplicationMaster", t);
296      LogManager.shutdown();
297      ExitUtil.terminate(1, t);
298    }
299    if (result) {
300      LOG.info("Application Master completed successfully. exiting");
301      System.exit(0);
302    } else {
303      LOG.info("Application Master failed. exiting");
304      System.exit(2);
305    }
306  }
307
308  /**
309   * Dump out contents of $CWD and the environment to stdout for debugging
310   */
311  private void dumpOutDebugInfo() {
312
313    LOG.info("Dump debug output");
314    Map<String, String> envs = System.getenv();
315    for (Map.Entry<String, String> env : envs.entrySet()) {
316      LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
317      System.out.println("System env: key=" + env.getKey() + ", val="
318          + env.getValue());
319    }
320
321    BufferedReader buf = null;
322    try {
323      String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
324        Shell.execCommand("ls", "-al");
325      buf = new BufferedReader(new StringReader(lines));
326      String line = "";
327      while ((line = buf.readLine()) != null) {
328        LOG.info("System CWD content: " + line);
329        System.out.println("System CWD content: " + line);
330      }
331    } catch (IOException e) {
332      e.printStackTrace();
333    } finally {
334      IOUtils.cleanup(LOG, buf);
335    }
336  }
337
338  public ApplicationMaster() {
339    // Set up the configuration
340    conf = new YarnConfiguration();
341  }
342
343  /**
344   * Parse command line options
345   *
346   * @param args Command line args
347   * @return Whether init successful and run should be invoked
348   * @throws ParseException
349   * @throws IOException
350   */
351  public boolean init(String[] args) throws ParseException, IOException {
352    Options opts = new Options();
353    opts.addOption("app_attempt_id", true,
354        "App Attempt ID. Not to be used unless for testing purposes");
355    opts.addOption("shell_env", true,
356        "Environment for shell script. Specified as env_key=env_val pairs");
357    opts.addOption("container_memory", true,
358        "Amount of memory in MB to be requested to run the shell command");
359    opts.addOption("container_vcores", true,
360        "Amount of virtual cores to be requested to run the shell command");
361    opts.addOption("num_containers", true,
362        "No. of containers on which the shell command needs to be executed");
363    opts.addOption("priority", true, "Application Priority. Default 0");
364    opts.addOption("debug", false, "Dump out debug information");
365
366    opts.addOption("help", false, "Print usage");
367    CommandLine cliParser = new GnuParser().parse(opts, args);
368
369    if (args.length == 0) {
370      printUsage(opts);
371      throw new IllegalArgumentException(
372          "No args specified for application master to initialize");
373    }
374
375    //Check whether customer log4j.properties file exists
376    if (fileExist(log4jPath)) {
377      try {
378        Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
379            log4jPath);
380      } catch (Exception e) {
381        LOG.warn("Can not set up custom log4j properties. " + e);
382      }
383    }
384
385    if (cliParser.hasOption("help")) {
386      printUsage(opts);
387      return false;
388    }
389
390    if (cliParser.hasOption("debug")) {
391      dumpOutDebugInfo();
392    }
393
394    Map<String, String> envs = System.getenv();
395
396    if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
397      if (cliParser.hasOption("app_attempt_id")) {
398        String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
399        appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
400      } else {
401        throw new IllegalArgumentException(
402            "Application Attempt Id not set in the environment");
403      }
404    } else {
405      ContainerId containerId = ConverterUtils.toContainerId(envs
406          .get(Environment.CONTAINER_ID.name()));
407      appAttemptID = containerId.getApplicationAttemptId();
408    }
409
410    if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
411      throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
412          + " not set in the environment");
413    }
414    if (!envs.containsKey(Environment.NM_HOST.name())) {
415      throw new RuntimeException(Environment.NM_HOST.name()
416          + " not set in the environment");
417    }
418    if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
419      throw new RuntimeException(Environment.NM_HTTP_PORT
420          + " not set in the environment");
421    }
422    if (!envs.containsKey(Environment.NM_PORT.name())) {
423      throw new RuntimeException(Environment.NM_PORT.name()
424          + " not set in the environment");
425    }
426
427    LOG.info("Application master for app" + ", appId="
428        + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
429        + appAttemptID.getApplicationId().getClusterTimestamp()
430        + ", attemptId=" + appAttemptID.getAttemptId());
431
432    if (!fileExist(shellCommandPath)
433        && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
434      throw new IllegalArgumentException(
435          "No shell command or shell script specified to be executed by application master");
436    }
437
438    if (fileExist(shellCommandPath)) {
439      shellCommand = readContent(shellCommandPath);
440    }
441
442    if (fileExist(shellArgsPath)) {
443      shellArgs = readContent(shellArgsPath);
444    }
445
446    if (cliParser.hasOption("shell_env")) {
447      String shellEnvs[] = cliParser.getOptionValues("shell_env");
448      for (String env : shellEnvs) {
449        env = env.trim();
450        int index = env.indexOf('=');
451        if (index == -1) {
452          shellEnv.put(env, "");
453          continue;
454        }
455        String key = env.substring(0, index);
456        String val = "";
457        if (index < (env.length() - 1)) {
458          val = env.substring(index + 1);
459        }
460        shellEnv.put(key, val);
461      }
462    }
463
464    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
465      scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
466
467      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
468        shellScriptPathTimestamp = Long.parseLong(envs
469            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
470      }
471      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
472        shellScriptPathLen = Long.parseLong(envs
473            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
474      }
475      if (!scriptPath.isEmpty()
476          && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
477        LOG.error("Illegal values in env for shell script path" + ", path="
478            + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
479            + shellScriptPathTimestamp);
480        throw new IllegalArgumentException(
481            "Illegal values in env for shell script path");
482      }
483    }
484
485    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
486      domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
487    }
488
489    containerMemory = Integer.parseInt(cliParser.getOptionValue(
490        "container_memory", "10"));
491    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
492        "container_vcores", "1"));
493    numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
494        "num_containers", "1"));
495    if (numTotalContainers == 0) {
496      throw new IllegalArgumentException(
497          "Cannot run distributed shell with no containers");
498    }
499    requestPriority = Integer.parseInt(cliParser
500        .getOptionValue("priority", "0"));
501    return true;
502  }
503
504  /**
505   * Helper function to print usage
506   *
507   * @param opts Parsed command line options
508   */
509  private void printUsage(Options opts) {
510    new HelpFormatter().printHelp("ApplicationMaster", opts);
511  }
512
513  /**
514   * Main run function for the application master
515   *
516   * @throws YarnException
517   * @throws IOException
518   */
519  @SuppressWarnings({ "unchecked" })
520  public void run() throws YarnException, IOException, InterruptedException {
521    LOG.info("Starting ApplicationMaster");
522
523    // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
524    // are marked as LimitedPrivate
525    Credentials credentials =
526        UserGroupInformation.getCurrentUser().getCredentials();
527    DataOutputBuffer dob = new DataOutputBuffer();
528    credentials.writeTokenStorageToStream(dob);
529    // Now remove the AM->RM token so that containers cannot access it.
530    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
531    LOG.info("Executing with tokens:");
532    while (iter.hasNext()) {
533      Token<?> token = iter.next();
534      LOG.info(token);
535      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
536        iter.remove();
537      }
538    }
539    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
540
541    // Create appSubmitterUgi and add original tokens to it
542    String appSubmitterUserName =
543        System.getenv(ApplicationConstants.Environment.USER.name());
544    appSubmitterUgi =
545        UserGroupInformation.createRemoteUser(appSubmitterUserName);
546    appSubmitterUgi.addCredentials(credentials);
547
548
549    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
550    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
551    amRMClient.init(conf);
552    amRMClient.start();
553
554    containerListener = createNMCallbackHandler();
555    nmClientAsync = new NMClientAsyncImpl(containerListener);
556    nmClientAsync.init(conf);
557    nmClientAsync.start();
558
559    startTimelineClient(conf);
560    if(timelineClient != null) {
561      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
562          DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
563    }
564
565    // Setup local RPC Server to accept status requests directly from clients
566    // TODO need to setup a protocol for client to be able to communicate to
567    // the RPC server
568    // TODO use the rpc port info to register with the RM for the client to
569    // send requests to this app master
570
571    // Register self with ResourceManager
572    // This will start heartbeating to the RM
573    appMasterHostname = NetUtils.getHostname();
574    RegisterApplicationMasterResponse response = amRMClient
575        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
576            appMasterTrackingUrl);
577    // Dump out information about cluster capability as seen by the
578    // resource manager
579    int maxMem = response.getMaximumResourceCapability().getMemory();
580    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
581    
582    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
583    LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
584
585    // A resource ask cannot exceed the max.
586    if (containerMemory > maxMem) {
587      LOG.info("Container memory specified above max threshold of cluster."
588          + " Using max value." + ", specified=" + containerMemory + ", max="
589          + maxMem);
590      containerMemory = maxMem;
591    }
592
593    if (containerVirtualCores > maxVCores) {
594      LOG.info("Container virtual cores specified above max threshold of cluster."
595          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
596          + maxVCores);
597      containerVirtualCores = maxVCores;
598    }
599
600    List<Container> previousAMRunningContainers =
601        response.getContainersFromPreviousAttempts();
602    LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
603      + " previous attempts' running containers on AM registration.");
604    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
605
606    int numTotalContainersToRequest =
607        numTotalContainers - previousAMRunningContainers.size();
608    // Setup ask for containers from RM
609    // Send request for containers to RM
610    // Until we get our fully allocated quota, we keep on polling RM for
611    // containers
612    // Keep looping until all the containers are launched and shell script
613    // executed on them ( regardless of success/failure).
614    for (int i = 0; i < numTotalContainersToRequest; ++i) {
615      ContainerRequest containerAsk = setupContainerAskForRM();
616      amRMClient.addContainerRequest(containerAsk);
617    }
618    numRequestedContainers.set(numTotalContainers);
619  }
620
621  @VisibleForTesting
622  void startTimelineClient(final Configuration conf)
623      throws YarnException, IOException, InterruptedException {
624    try {
625      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
626        @Override
627        public Void run() throws Exception {
628          if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
629              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
630            // Creating the Timeline Client
631            timelineClient = TimelineClient.createTimelineClient();
632            timelineClient.init(conf);
633            timelineClient.start();
634          } else {
635            timelineClient = null;
636            LOG.warn("Timeline service is not enabled");
637          }
638          return null;
639        }
640      });
641    } catch (UndeclaredThrowableException e) {
642      throw new YarnException(e.getCause());
643    }
644  }
645
646  @VisibleForTesting
647  NMCallbackHandler createNMCallbackHandler() {
648    return new NMCallbackHandler(this);
649  }
650
651  @VisibleForTesting
652  protected boolean finish() {
653    // wait for completion.
654    while (!done
655        && (numCompletedContainers.get() != numTotalContainers)) {
656      try {
657        Thread.sleep(200);
658      } catch (InterruptedException ex) {}
659    }
660
661    if(timelineClient != null) {
662      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
663          DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
664    }
665
666    // Join all launched threads
667    // needed for when we time out
668    // and we need to release containers
669    for (Thread launchThread : launchThreads) {
670      try {
671        launchThread.join(10000);
672      } catch (InterruptedException e) {
673        LOG.info("Exception thrown in thread join: " + e.getMessage());
674        e.printStackTrace();
675      }
676    }
677
678    // When the application completes, it should stop all running containers
679    LOG.info("Application completed. Stopping running containers");
680    nmClientAsync.stop();
681
682    // When the application completes, it should send a finish application
683    // signal to the RM
684    LOG.info("Application completed. Signalling finish to RM");
685
686    FinalApplicationStatus appStatus;
687    String appMessage = null;
688    boolean success = true;
689    if (numFailedContainers.get() == 0 && 
690        numCompletedContainers.get() == numTotalContainers) {
691      appStatus = FinalApplicationStatus.SUCCEEDED;
692    } else {
693      appStatus = FinalApplicationStatus.FAILED;
694      appMessage = "Diagnostics." + ", total=" + numTotalContainers
695          + ", completed=" + numCompletedContainers.get() + ", allocated="
696          + numAllocatedContainers.get() + ", failed="
697          + numFailedContainers.get();
698      LOG.info(appMessage);
699      success = false;
700    }
701    try {
702      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
703    } catch (YarnException ex) {
704      LOG.error("Failed to unregister application", ex);
705    } catch (IOException e) {
706      LOG.error("Failed to unregister application", e);
707    }
708    
709    amRMClient.stop();
710
711    // Stop Timeline Client
712    if(timelineClient != null) {
713      timelineClient.stop();
714    }
715
716    return success;
717  }
718  
719  private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
720    @SuppressWarnings("unchecked")
721    @Override
722    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
723      LOG.info("Got response from RM for container ask, completedCnt="
724          + completedContainers.size());
725      for (ContainerStatus containerStatus : completedContainers) {
726        LOG.info(appAttemptID + " got container status for containerID="
727            + containerStatus.getContainerId() + ", state="
728            + containerStatus.getState() + ", exitStatus="
729            + containerStatus.getExitStatus() + ", diagnostics="
730            + containerStatus.getDiagnostics());
731
732        // non complete containers should not be here
733        assert (containerStatus.getState() == ContainerState.COMPLETE);
734
735        // increment counters for completed/failed containers
736        int exitStatus = containerStatus.getExitStatus();
737        if (0 != exitStatus) {
738          // container failed
739          if (ContainerExitStatus.ABORTED != exitStatus) {
740            // shell script failed
741            // counts as completed
742            numCompletedContainers.incrementAndGet();
743            numFailedContainers.incrementAndGet();
744          } else {
745            // container was killed by framework, possibly preempted
746            // we should re-try as the container was lost for some reason
747            numAllocatedContainers.decrementAndGet();
748            numRequestedContainers.decrementAndGet();
749            // we do not need to release the container as it would be done
750            // by the RM
751          }
752        } else {
753          // nothing to do
754          // container completed successfully
755          numCompletedContainers.incrementAndGet();
756          LOG.info("Container completed successfully." + ", containerId="
757              + containerStatus.getContainerId());
758        }
759        if(timelineClient != null) {
760          publishContainerEndEvent(
761              timelineClient, containerStatus, domainId, appSubmitterUgi);
762        }
763      }
764      
765      // ask for more containers if any failed
766      int askCount = numTotalContainers - numRequestedContainers.get();
767      numRequestedContainers.addAndGet(askCount);
768
769      if (askCount > 0) {
770        for (int i = 0; i < askCount; ++i) {
771          ContainerRequest containerAsk = setupContainerAskForRM();
772          amRMClient.addContainerRequest(containerAsk);
773        }
774      }
775      
776      if (numCompletedContainers.get() == numTotalContainers) {
777        done = true;
778      }
779    }
780
781    @Override
782    public void onContainersAllocated(List<Container> allocatedContainers) {
783      LOG.info("Got response from RM for container ask, allocatedCnt="
784          + allocatedContainers.size());
785      numAllocatedContainers.addAndGet(allocatedContainers.size());
786      for (Container allocatedContainer : allocatedContainers) {
787        LOG.info("Launching shell command on a new container."
788            + ", containerId=" + allocatedContainer.getId()
789            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
790            + ":" + allocatedContainer.getNodeId().getPort()
791            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
792            + ", containerResourceMemory"
793            + allocatedContainer.getResource().getMemory()
794            + ", containerResourceVirtualCores"
795            + allocatedContainer.getResource().getVirtualCores());
796        // + ", containerToken"
797        // +allocatedContainer.getContainerToken().getIdentifier().toString());
798
799        LaunchContainerRunnable runnableLaunchContainer =
800            new LaunchContainerRunnable(allocatedContainer, containerListener);
801        Thread launchThread = new Thread(runnableLaunchContainer);
802
803        // launch and start the container on a separate thread to keep
804        // the main thread unblocked
805        // as all containers may not be allocated at one go.
806        launchThreads.add(launchThread);
807        launchThread.start();
808      }
809    }
810
811    @Override
812    public void onShutdownRequest() {
813      done = true;
814    }
815
816    @Override
817    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
818
819    @Override
820    public float getProgress() {
821      // set progress to deliver to RM on next heartbeat
822      float progress = (float) numCompletedContainers.get()
823          / numTotalContainers;
824      return progress;
825    }
826
827    @Override
828    public void onError(Throwable e) {
829      done = true;
830      amRMClient.stop();
831    }
832  }
833
834  @VisibleForTesting
835  static class NMCallbackHandler
836    implements NMClientAsync.CallbackHandler {
837
838    private ConcurrentMap<ContainerId, Container> containers =
839        new ConcurrentHashMap<ContainerId, Container>();
840    private final ApplicationMaster applicationMaster;
841
842    public NMCallbackHandler(ApplicationMaster applicationMaster) {
843      this.applicationMaster = applicationMaster;
844    }
845
846    public void addContainer(ContainerId containerId, Container container) {
847      containers.putIfAbsent(containerId, container);
848    }
849
850    @Override
851    public void onContainerStopped(ContainerId containerId) {
852      if (LOG.isDebugEnabled()) {
853        LOG.debug("Succeeded to stop Container " + containerId);
854      }
855      containers.remove(containerId);
856    }
857
858    @Override
859    public void onContainerStatusReceived(ContainerId containerId,
860        ContainerStatus containerStatus) {
861      if (LOG.isDebugEnabled()) {
862        LOG.debug("Container Status: id=" + containerId + ", status=" +
863            containerStatus);
864      }
865    }
866
867    @Override
868    public void onContainerStarted(ContainerId containerId,
869        Map<String, ByteBuffer> allServiceResponse) {
870      if (LOG.isDebugEnabled()) {
871        LOG.debug("Succeeded to start Container " + containerId);
872      }
873      Container container = containers.get(containerId);
874      if (container != null) {
875        applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
876      }
877      if(applicationMaster.timelineClient != null) {
878        ApplicationMaster.publishContainerStartEvent(
879            applicationMaster.timelineClient, container,
880            applicationMaster.domainId, applicationMaster.appSubmitterUgi);
881      }
882    }
883
884    @Override
885    public void onStartContainerError(ContainerId containerId, Throwable t) {
886      LOG.error("Failed to start Container " + containerId);
887      containers.remove(containerId);
888      applicationMaster.numCompletedContainers.incrementAndGet();
889      applicationMaster.numFailedContainers.incrementAndGet();
890    }
891
892    @Override
893    public void onGetContainerStatusError(
894        ContainerId containerId, Throwable t) {
895      LOG.error("Failed to query the status of Container " + containerId);
896    }
897
898    @Override
899    public void onStopContainerError(ContainerId containerId, Throwable t) {
900      LOG.error("Failed to stop Container " + containerId);
901      containers.remove(containerId);
902    }
903  }
904
905  /**
906   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
907   * that will execute the shell command.
908   */
909  private class LaunchContainerRunnable implements Runnable {
910
911    // Allocated container
912    Container container;
913
914    NMCallbackHandler containerListener;
915
916    /**
917     * @param lcontainer Allocated container
918     * @param containerListener Callback handler of the container
919     */
920    public LaunchContainerRunnable(
921        Container lcontainer, NMCallbackHandler containerListener) {
922      this.container = lcontainer;
923      this.containerListener = containerListener;
924    }
925
926    @Override
927    /**
928     * Connects to CM, sets up container launch context 
929     * for shell command and eventually dispatches the container 
930     * start request to the CM. 
931     */
932    public void run() {
933      LOG.info("Setting up container launch container for containerid="
934          + container.getId());
935
936      // Set the local resources
937      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
938
939      // The container for the eventual shell commands needs its own local
940      // resources too.
941      // In this scenario, if a shell script is specified, we need to have it
942      // copied and made available to the container.
943      if (!scriptPath.isEmpty()) {
944        Path renamedScriptPath = null;
945        if (Shell.WINDOWS) {
946          renamedScriptPath = new Path(scriptPath + ".bat");
947        } else {
948          renamedScriptPath = new Path(scriptPath + ".sh");
949        }
950
951        try {
952          // rename the script file based on the underlying OS syntax.
953          renameScriptFile(renamedScriptPath);
954        } catch (Exception e) {
955          LOG.error(
956              "Not able to add suffix (.bat/.sh) to the shell script filename",
957              e);
958          // We know we cannot continue launching the container
959          // so we should release it.
960          numCompletedContainers.incrementAndGet();
961          numFailedContainers.incrementAndGet();
962          return;
963        }
964
965        URL yarnUrl = null;
966        try {
967          yarnUrl = ConverterUtils.getYarnUrlFromURI(
968            new URI(renamedScriptPath.toString()));
969        } catch (URISyntaxException e) {
970          LOG.error("Error when trying to use shell script path specified"
971              + " in env, path=" + renamedScriptPath, e);
972          // A failure scenario on bad input such as invalid shell script path
973          // We know we cannot continue launching the container
974          // so we should release it.
975          // TODO
976          numCompletedContainers.incrementAndGet();
977          numFailedContainers.incrementAndGet();
978          return;
979        }
980        LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
981          LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
982          shellScriptPathLen, shellScriptPathTimestamp);
983        localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
984            ExecShellStringPath, shellRsrc);
985        shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
986      }
987
988      // Set the necessary command to execute on the allocated container
989      Vector<CharSequence> vargs = new Vector<CharSequence>(5);
990
991      // Set executable command
992      vargs.add(shellCommand);
993      // Set shell script path
994      if (!scriptPath.isEmpty()) {
995        vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
996            : ExecShellStringPath);
997      }
998
999      // Set args for the shell command if any
1000      vargs.add(shellArgs);
1001      // Add log redirect params
1002      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
1003      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
1004
1005      // Get final commmand
1006      StringBuilder command = new StringBuilder();
1007      for (CharSequence str : vargs) {
1008        command.append(str).append(" ");
1009      }
1010
1011      List<String> commands = new ArrayList<String>();
1012      commands.add(command.toString());
1013
1014      // Set up ContainerLaunchContext, setting local resource, environment,
1015      // command and token for constructor.
1016
1017      // Note for tokens: Set up tokens for the container too. Today, for normal
1018      // shell commands, the container in distribute-shell doesn't need any
1019      // tokens. We are populating them mainly for NodeManagers to be able to
1020      // download anyfiles in the distributed file-system. The tokens are
1021      // otherwise also useful in cases, for e.g., when one is running a
1022      // "hadoop dfs" command inside the distributed shell.
1023      ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
1024        localResources, shellEnv, commands, null, allTokens.duplicate(), null);
1025      containerListener.addContainer(container.getId(), container);
1026      nmClientAsync.startContainerAsync(container, ctx);
1027    }
1028  }
1029
1030  private void renameScriptFile(final Path renamedScriptPath)
1031      throws IOException, InterruptedException {
1032    appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
1033      @Override
1034      public Void run() throws IOException {
1035        FileSystem fs = renamedScriptPath.getFileSystem(conf);
1036        fs.rename(new Path(scriptPath), renamedScriptPath);
1037        return null;
1038      }
1039    });
1040    LOG.info("User " + appSubmitterUgi.getUserName()
1041        + " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
1042  }
1043
1044  /**
1045   * Setup the request that will be sent to the RM for the container ask.
1046   *
1047   * @return the setup ResourceRequest to be sent to RM
1048   */
1049  private ContainerRequest setupContainerAskForRM() {
1050    // setup requirements for hosts
1051    // using * as any host will do for the distributed shell app
1052    // set the priority for the request
1053    // TODO - what is the range for priority? how to decide?
1054    Priority pri = Priority.newInstance(requestPriority);
1055
1056    // Set up resource type requirements
1057    // For now, memory and CPU are supported so we set memory and cpu requirements
1058    Resource capability = Resource.newInstance(containerMemory,
1059      containerVirtualCores);
1060
1061    ContainerRequest request = new ContainerRequest(capability, null, null,
1062        pri);
1063    LOG.info("Requested container ask: " + request.toString());
1064    return request;
1065  }
1066
1067  private boolean fileExist(String filePath) {
1068    return new File(filePath).exists();
1069  }
1070
1071  private String readContent(String filePath) throws IOException {
1072    DataInputStream ds = null;
1073    try {
1074      ds = new DataInputStream(new FileInputStream(filePath));
1075      return ds.readUTF();
1076    } finally {
1077      org.apache.commons.io.IOUtils.closeQuietly(ds);
1078    }
1079  }
1080  
1081  private static void publishContainerStartEvent(
1082      final TimelineClient timelineClient, Container container, String domainId,
1083      UserGroupInformation ugi) {
1084    final TimelineEntity entity = new TimelineEntity();
1085    entity.setEntityId(container.getId().toString());
1086    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1087    entity.setDomainId(domainId);
1088    entity.addPrimaryFilter("user", ugi.getShortUserName());
1089    TimelineEvent event = new TimelineEvent();
1090    event.setTimestamp(System.currentTimeMillis());
1091    event.setEventType(DSEvent.DS_CONTAINER_START.toString());
1092    event.addEventInfo("Node", container.getNodeId().toString());
1093    event.addEventInfo("Resources", container.getResource().toString());
1094    entity.addEvent(event);
1095
1096    try {
1097      ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
1098        @Override
1099        public TimelinePutResponse run() throws Exception {
1100          return timelineClient.putEntities(entity);
1101        }
1102      });
1103    } catch (Exception e) {
1104      LOG.error("Container start event could not be published for "
1105          + container.getId().toString(),
1106          e instanceof UndeclaredThrowableException ? e.getCause() : e);
1107    }
1108  }
1109
1110  private static void publishContainerEndEvent(
1111      final TimelineClient timelineClient, ContainerStatus container,
1112      String domainId, UserGroupInformation ugi) {
1113    final TimelineEntity entity = new TimelineEntity();
1114    entity.setEntityId(container.getContainerId().toString());
1115    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1116    entity.setDomainId(domainId);
1117    entity.addPrimaryFilter("user", ugi.getShortUserName());
1118    TimelineEvent event = new TimelineEvent();
1119    event.setTimestamp(System.currentTimeMillis());
1120    event.setEventType(DSEvent.DS_CONTAINER_END.toString());
1121    event.addEventInfo("State", container.getState().name());
1122    event.addEventInfo("Exit Status", container.getExitStatus());
1123    entity.addEvent(event);
1124    try {
1125      timelineClient.putEntities(entity);
1126    } catch (YarnException | IOException e) {
1127      LOG.error("Container end event could not be published for "
1128          + container.getContainerId().toString(), e);
1129    }
1130  }
1131
1132  private static void publishApplicationAttemptEvent(
1133      final TimelineClient timelineClient, String appAttemptId,
1134      DSEvent appEvent, String domainId, UserGroupInformation ugi) {
1135    final TimelineEntity entity = new TimelineEntity();
1136    entity.setEntityId(appAttemptId);
1137    entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
1138    entity.setDomainId(domainId);
1139    entity.addPrimaryFilter("user", ugi.getShortUserName());
1140    TimelineEvent event = new TimelineEvent();
1141    event.setEventType(appEvent.toString());
1142    event.setTimestamp(System.currentTimeMillis());
1143    entity.addEvent(event);
1144    try {
1145      timelineClient.putEntities(entity);
1146    } catch (YarnException | IOException e) {
1147      LOG.error("App Attempt "
1148          + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
1149          + " event could not be published for "
1150          + appAttemptId.toString(), e);
1151    }
1152  }
1153}