001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.applications.distributedshell;
020
021import java.io.BufferedReader;
022import java.io.DataInputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.IOException;
026import java.io.StringReader;
027import java.lang.reflect.UndeclaredThrowableException;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.nio.ByteBuffer;
031import java.security.PrivilegedExceptionAction;
032import java.util.ArrayList;
033import java.util.HashMap;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Vector;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentMap;
040import java.util.concurrent.atomic.AtomicInteger;
041
042import org.apache.commons.cli.CommandLine;
043import org.apache.commons.cli.GnuParser;
044import org.apache.commons.cli.HelpFormatter;
045import org.apache.commons.cli.Options;
046import org.apache.commons.cli.ParseException;
047import org.apache.commons.logging.Log;
048import org.apache.commons.logging.LogFactory;
049import org.apache.hadoop.classification.InterfaceAudience;
050import org.apache.hadoop.classification.InterfaceAudience.Private;
051import org.apache.hadoop.classification.InterfaceStability;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.FileSystem;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.io.DataOutputBuffer;
056import org.apache.hadoop.io.IOUtils;
057import org.apache.hadoop.net.NetUtils;
058import org.apache.hadoop.security.Credentials;
059import org.apache.hadoop.security.UserGroupInformation;
060import org.apache.hadoop.security.token.Token;
061import org.apache.hadoop.util.ExitUtil;
062import org.apache.hadoop.util.Shell;
063import org.apache.hadoop.yarn.api.ApplicationConstants;
064import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
065import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
066import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
067import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
068import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
069import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
070import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
071import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
072import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
073import org.apache.hadoop.yarn.api.records.Container;
074import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
075import org.apache.hadoop.yarn.api.records.ContainerId;
076import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
077import org.apache.hadoop.yarn.api.records.ContainerState;
078import org.apache.hadoop.yarn.api.records.ContainerStatus;
079import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
080import org.apache.hadoop.yarn.api.records.LocalResource;
081import org.apache.hadoop.yarn.api.records.LocalResourceType;
082import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
083import org.apache.hadoop.yarn.api.records.NodeReport;
084import org.apache.hadoop.yarn.api.records.Priority;
085import org.apache.hadoop.yarn.api.records.Resource;
086import org.apache.hadoop.yarn.api.records.ResourceRequest;
087import org.apache.hadoop.yarn.api.records.URL;
088import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
089import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
090import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
091import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
092import org.apache.hadoop.yarn.client.api.TimelineClient;
093import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
094import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
095import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
096import org.apache.hadoop.yarn.conf.YarnConfiguration;
097import org.apache.hadoop.yarn.exceptions.YarnException;
098import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
099import org.apache.hadoop.yarn.util.ConverterUtils;
100import org.apache.log4j.LogManager;
101
102import com.google.common.annotations.VisibleForTesting;
103
104/**
105 * An ApplicationMaster for executing shell commands on a set of launched
106 * containers using the YARN framework.
107 * 
108 * <p>
109 * This class is meant to act as an example on how to write yarn-based
110 * application masters.
111 * </p>
112 * 
113 * <p>
114 * The ApplicationMaster is started on a container by the
115 * <code>ResourceManager</code>'s launcher. The first thing that the
116 * <code>ApplicationMaster</code> needs to do is to connect and register itself
117 * with the <code>ResourceManager</code>. The registration sets up information
118 * within the <code>ResourceManager</code> regarding what host:port the
119 * ApplicationMaster is listening on to provide any form of functionality to a
120 * client as well as a tracking url that a client can use to keep track of
121 * status/job history if needed. However, in the distributedshell, trackingurl
122 * and appMasterHost:appMasterRpcPort are not supported.
123 * </p>
124 * 
125 * <p>
126 * The <code>ApplicationMaster</code> needs to send a heartbeat to the
127 * <code>ResourceManager</code> at regular intervals to inform the
128 * <code>ResourceManager</code> that it is up and alive. The
129 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
130 * <code>ApplicationMaster</code> acts as a heartbeat.
131 * 
132 * <p>
133 * For the actual handling of the job, the <code>ApplicationMaster</code> has to
134 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
135 * required no. of containers using {@link ResourceRequest} with the necessary
136 * resource specifications such as node location, computational
137 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
138 * responds with an {@link AllocateResponse} that informs the
139 * <code>ApplicationMaster</code> of the set of newly allocated containers,
140 * completed containers as well as current state of available resources.
141 * </p>
142 * 
143 * <p>
144 * For each allocated container, the <code>ApplicationMaster</code> can then set
145 * up the necessary launch context via {@link ContainerLaunchContext} to specify
146 * the allocated container id, local resources required by the executable, the
147 * environment to be setup for the executable, commands to execute, etc. and
148 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
149 * launch and execute the defined commands on the given allocated container.
150 * </p>
151 * 
152 * <p>
153 * The <code>ApplicationMaster</code> can monitor the launched container by
154 * either querying the <code>ResourceManager</code> using
155 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
156 * the {@link ContainerManagementProtocol} by querying for the status of the allocated
157 * container's {@link ContainerId}.
158 *
159 * <p>
160 * After the job has been completed, the <code>ApplicationMaster</code> has to
161 * send a {@link FinishApplicationMasterRequest} to the
162 * <code>ResourceManager</code> to inform it that the
163 * <code>ApplicationMaster</code> has been completed.
164 */
165@InterfaceAudience.Public
166@InterfaceStability.Unstable
167public class ApplicationMaster {
168
169  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
170
171  @VisibleForTesting
172  @Private
173  public static enum DSEvent {
174    DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
175  }
176  
177  @VisibleForTesting
178  @Private
179  public static enum DSEntity {
180    DS_APP_ATTEMPT, DS_CONTAINER
181  }
182
183  // Configuration
184  private Configuration conf;
185
186  // Handle to communicate with the Resource Manager
187  @SuppressWarnings("rawtypes")
188  private AMRMClientAsync amRMClient;
189
190  // In both secure and non-secure modes, this points to the job-submitter.
191  private UserGroupInformation appSubmitterUgi;
192
193  // Handle to communicate with the Node Manager
194  private NMClientAsync nmClientAsync;
195  // Listen to process the response from the Node Manager
196  private NMCallbackHandler containerListener;
197  
198  // Application Attempt Id ( combination of attemptId and fail count )
199  @VisibleForTesting
200  protected ApplicationAttemptId appAttemptID;
201
202  // TODO
203  // For status update for clients - yet to be implemented
204  // Hostname of the container
205  private String appMasterHostname = "";
206  // Port on which the app master listens for status updates from clients
207  private int appMasterRpcPort = -1;
208  // Tracking url to which app master publishes info for clients to monitor
209  private String appMasterTrackingUrl = "";
210
211  // App Master configuration
212  // No. of containers to run shell command on
213  @VisibleForTesting
214  protected int numTotalContainers = 1;
215  // Memory to request for the container on which the shell command will run
216  private int containerMemory = 10;
217  // VirtualCores to request for the container on which the shell command will run
218  private int containerVirtualCores = 1;
219  // Priority of the request
220  private int requestPriority;
221
222  // Counter for completed containers ( complete denotes successful or failed )
223  private AtomicInteger numCompletedContainers = new AtomicInteger();
224  // Allocated container count so that we know how many containers has the RM
225  // allocated to us
226  @VisibleForTesting
227  protected AtomicInteger numAllocatedContainers = new AtomicInteger();
228  // Count of failed containers
229  private AtomicInteger numFailedContainers = new AtomicInteger();
230  // Count of containers already requested from the RM
231  // Needed as once requested, we should not request for containers again.
232  // Only request for more if the original requirement changes.
233  @VisibleForTesting
234  protected AtomicInteger numRequestedContainers = new AtomicInteger();
235
236  // Shell command to be executed
237  private String shellCommand = "";
238  // Args to be passed to the shell command
239  private String shellArgs = "";
240  // Env variables to be setup for the shell command
241  private Map<String, String> shellEnv = new HashMap<String, String>();
242
243  // Location of shell script ( obtained from info set in env )
244  // Shell script path in fs
245  private String scriptPath = "";
246  // Timestamp needed for creating a local resource
247  private long shellScriptPathTimestamp = 0;
248  // File length needed for local resource
249  private long shellScriptPathLen = 0;
250
251  // Timeline domain ID
252  private String domainId = null;
253
254  // Hardcoded path to shell script in launch container's local env
255  private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
256  private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
257      + ".bat";
258
259  // Hardcoded path to custom log_properties
260  private static final String log4jPath = "log4j.properties";
261
262  private static final String shellCommandPath = "shellCommands";
263  private static final String shellArgsPath = "shellArgs";
264
265  private volatile boolean done;
266
267  private ByteBuffer allTokens;
268
269  // Launch threads
270  private List<Thread> launchThreads = new ArrayList<Thread>();
271
272  // Timeline Client
273  private TimelineClient timelineClient;
274
275  private final String linux_bash_command = "bash";
276  private final String windows_command = "cmd /c";
277
278  /**
279   * @param args Command line args
280   */
281  public static void main(String[] args) {
282    boolean result = false;
283    try {
284      ApplicationMaster appMaster = new ApplicationMaster();
285      LOG.info("Initializing ApplicationMaster");
286      boolean doRun = appMaster.init(args);
287      if (!doRun) {
288        System.exit(0);
289      }
290      appMaster.run();
291      result = appMaster.finish();
292    } catch (Throwable t) {
293      LOG.fatal("Error running ApplicationMaster", t);
294      LogManager.shutdown();
295      ExitUtil.terminate(1, t);
296    }
297    if (result) {
298      LOG.info("Application Master completed successfully. exiting");
299      System.exit(0);
300    } else {
301      LOG.info("Application Master failed. exiting");
302      System.exit(2);
303    }
304  }
305
306  /**
307   * Dump out contents of $CWD and the environment to stdout for debugging
308   */
309  private void dumpOutDebugInfo() {
310
311    LOG.info("Dump debug output");
312    Map<String, String> envs = System.getenv();
313    for (Map.Entry<String, String> env : envs.entrySet()) {
314      LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
315      System.out.println("System env: key=" + env.getKey() + ", val="
316          + env.getValue());
317    }
318
319    BufferedReader buf = null;
320    try {
321      String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
322        Shell.execCommand("ls", "-al");
323      buf = new BufferedReader(new StringReader(lines));
324      String line = "";
325      while ((line = buf.readLine()) != null) {
326        LOG.info("System CWD content: " + line);
327        System.out.println("System CWD content: " + line);
328      }
329    } catch (IOException e) {
330      e.printStackTrace();
331    } finally {
332      IOUtils.cleanup(LOG, buf);
333    }
334  }
335
336  public ApplicationMaster() {
337    // Set up the configuration
338    conf = new YarnConfiguration();
339  }
340
341  /**
342   * Parse command line options
343   *
344   * @param args Command line args
345   * @return Whether init successful and run should be invoked
346   * @throws ParseException
347   * @throws IOException
348   */
349  public boolean init(String[] args) throws ParseException, IOException {
350    Options opts = new Options();
351    opts.addOption("app_attempt_id", true,
352        "App Attempt ID. Not to be used unless for testing purposes");
353    opts.addOption("shell_env", true,
354        "Environment for shell script. Specified as env_key=env_val pairs");
355    opts.addOption("container_memory", true,
356        "Amount of memory in MB to be requested to run the shell command");
357    opts.addOption("container_vcores", true,
358        "Amount of virtual cores to be requested to run the shell command");
359    opts.addOption("num_containers", true,
360        "No. of containers on which the shell command needs to be executed");
361    opts.addOption("priority", true, "Application Priority. Default 0");
362    opts.addOption("debug", false, "Dump out debug information");
363
364    opts.addOption("help", false, "Print usage");
365    CommandLine cliParser = new GnuParser().parse(opts, args);
366
367    if (args.length == 0) {
368      printUsage(opts);
369      throw new IllegalArgumentException(
370          "No args specified for application master to initialize");
371    }
372
373    //Check whether customer log4j.properties file exists
374    if (fileExist(log4jPath)) {
375      try {
376        Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
377            log4jPath);
378      } catch (Exception e) {
379        LOG.warn("Can not set up custom log4j properties. " + e);
380      }
381    }
382
383    if (cliParser.hasOption("help")) {
384      printUsage(opts);
385      return false;
386    }
387
388    if (cliParser.hasOption("debug")) {
389      dumpOutDebugInfo();
390    }
391
392    Map<String, String> envs = System.getenv();
393
394    if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
395      if (cliParser.hasOption("app_attempt_id")) {
396        String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
397        appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
398      } else {
399        throw new IllegalArgumentException(
400            "Application Attempt Id not set in the environment");
401      }
402    } else {
403      ContainerId containerId = ConverterUtils.toContainerId(envs
404          .get(Environment.CONTAINER_ID.name()));
405      appAttemptID = containerId.getApplicationAttemptId();
406    }
407
408    if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
409      throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
410          + " not set in the environment");
411    }
412    if (!envs.containsKey(Environment.NM_HOST.name())) {
413      throw new RuntimeException(Environment.NM_HOST.name()
414          + " not set in the environment");
415    }
416    if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
417      throw new RuntimeException(Environment.NM_HTTP_PORT
418          + " not set in the environment");
419    }
420    if (!envs.containsKey(Environment.NM_PORT.name())) {
421      throw new RuntimeException(Environment.NM_PORT.name()
422          + " not set in the environment");
423    }
424
425    LOG.info("Application master for app" + ", appId="
426        + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
427        + appAttemptID.getApplicationId().getClusterTimestamp()
428        + ", attemptId=" + appAttemptID.getAttemptId());
429
430    if (!fileExist(shellCommandPath)
431        && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
432      throw new IllegalArgumentException(
433          "No shell command or shell script specified to be executed by application master");
434    }
435
436    if (fileExist(shellCommandPath)) {
437      shellCommand = readContent(shellCommandPath);
438    }
439
440    if (fileExist(shellArgsPath)) {
441      shellArgs = readContent(shellArgsPath);
442    }
443
444    if (cliParser.hasOption("shell_env")) {
445      String shellEnvs[] = cliParser.getOptionValues("shell_env");
446      for (String env : shellEnvs) {
447        env = env.trim();
448        int index = env.indexOf('=');
449        if (index == -1) {
450          shellEnv.put(env, "");
451          continue;
452        }
453        String key = env.substring(0, index);
454        String val = "";
455        if (index < (env.length() - 1)) {
456          val = env.substring(index + 1);
457        }
458        shellEnv.put(key, val);
459      }
460    }
461
462    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
463      scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
464
465      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
466        shellScriptPathTimestamp = Long.valueOf(envs
467            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
468      }
469      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
470        shellScriptPathLen = Long.valueOf(envs
471            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
472      }
473      if (!scriptPath.isEmpty()
474          && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
475        LOG.error("Illegal values in env for shell script path" + ", path="
476            + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
477            + shellScriptPathTimestamp);
478        throw new IllegalArgumentException(
479            "Illegal values in env for shell script path");
480      }
481    }
482
483    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
484      domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
485    }
486
487    containerMemory = Integer.parseInt(cliParser.getOptionValue(
488        "container_memory", "10"));
489    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
490        "container_vcores", "1"));
491    numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
492        "num_containers", "1"));
493    if (numTotalContainers == 0) {
494      throw new IllegalArgumentException(
495          "Cannot run distributed shell with no containers");
496    }
497    requestPriority = Integer.parseInt(cliParser
498        .getOptionValue("priority", "0"));
499
500    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
501      YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
502      // Creating the Timeline Client
503      timelineClient = TimelineClient.createTimelineClient();
504      timelineClient.init(conf);
505      timelineClient.start();
506    } else {
507      timelineClient = null;
508      LOG.warn("Timeline service is not enabled");
509    }
510
511    return true;
512  }
513
514  /**
515   * Helper function to print usage
516   *
517   * @param opts Parsed command line options
518   */
519  private void printUsage(Options opts) {
520    new HelpFormatter().printHelp("ApplicationMaster", opts);
521  }
522
523  /**
524   * Main run function for the application master
525   *
526   * @throws YarnException
527   * @throws IOException
528   */
529  @SuppressWarnings({ "unchecked" })
530  public void run() throws YarnException, IOException {
531    LOG.info("Starting ApplicationMaster");
532
533    // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
534    // are marked as LimitedPrivate
535    Credentials credentials =
536        UserGroupInformation.getCurrentUser().getCredentials();
537    DataOutputBuffer dob = new DataOutputBuffer();
538    credentials.writeTokenStorageToStream(dob);
539    // Now remove the AM->RM token so that containers cannot access it.
540    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
541    LOG.info("Executing with tokens:");
542    while (iter.hasNext()) {
543      Token<?> token = iter.next();
544      LOG.info(token);
545      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
546        iter.remove();
547      }
548    }
549    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
550
551    // Create appSubmitterUgi and add original tokens to it
552    String appSubmitterUserName =
553        System.getenv(ApplicationConstants.Environment.USER.name());
554    appSubmitterUgi =
555        UserGroupInformation.createRemoteUser(appSubmitterUserName);
556    appSubmitterUgi.addCredentials(credentials);
557    
558    if(timelineClient != null) {
559      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
560          DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
561    }
562
563    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
564    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
565    amRMClient.init(conf);
566    amRMClient.start();
567
568    containerListener = createNMCallbackHandler();
569    nmClientAsync = new NMClientAsyncImpl(containerListener);
570    nmClientAsync.init(conf);
571    nmClientAsync.start();
572
573    // Setup local RPC Server to accept status requests directly from clients
574    // TODO need to setup a protocol for client to be able to communicate to
575    // the RPC server
576    // TODO use the rpc port info to register with the RM for the client to
577    // send requests to this app master
578
579    // Register self with ResourceManager
580    // This will start heartbeating to the RM
581    appMasterHostname = NetUtils.getHostname();
582    RegisterApplicationMasterResponse response = amRMClient
583        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
584            appMasterTrackingUrl);
585    // Dump out information about cluster capability as seen by the
586    // resource manager
587    int maxMem = response.getMaximumResourceCapability().getMemory();
588    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
589    
590    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
591    LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
592
593    // A resource ask cannot exceed the max.
594    if (containerMemory > maxMem) {
595      LOG.info("Container memory specified above max threshold of cluster."
596          + " Using max value." + ", specified=" + containerMemory + ", max="
597          + maxMem);
598      containerMemory = maxMem;
599    }
600
601    if (containerVirtualCores > maxVCores) {
602      LOG.info("Container virtual cores specified above max threshold of cluster."
603          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
604          + maxVCores);
605      containerVirtualCores = maxVCores;
606    }
607
608    List<Container> previousAMRunningContainers =
609        response.getContainersFromPreviousAttempts();
610    LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
611      + " previous attempts' running containers on AM registration.");
612    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
613
614    int numTotalContainersToRequest =
615        numTotalContainers - previousAMRunningContainers.size();
616    // Setup ask for containers from RM
617    // Send request for containers to RM
618    // Until we get our fully allocated quota, we keep on polling RM for
619    // containers
620    // Keep looping until all the containers are launched and shell script
621    // executed on them ( regardless of success/failure).
622    for (int i = 0; i < numTotalContainersToRequest; ++i) {
623      ContainerRequest containerAsk = setupContainerAskForRM();
624      amRMClient.addContainerRequest(containerAsk);
625    }
626    numRequestedContainers.set(numTotalContainers);
627
628    if(timelineClient != null) {
629      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
630          DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
631    }
632  }
633
634  @VisibleForTesting
635  NMCallbackHandler createNMCallbackHandler() {
636    return new NMCallbackHandler(this);
637  }
638
639  @VisibleForTesting
640  protected boolean finish() {
641    // wait for completion.
642    while (!done
643        && (numCompletedContainers.get() != numTotalContainers)) {
644      try {
645        Thread.sleep(200);
646      } catch (InterruptedException ex) {}
647    }
648
649    // Join all launched threads
650    // needed for when we time out
651    // and we need to release containers
652    for (Thread launchThread : launchThreads) {
653      try {
654        launchThread.join(10000);
655      } catch (InterruptedException e) {
656        LOG.info("Exception thrown in thread join: " + e.getMessage());
657        e.printStackTrace();
658      }
659    }
660
661    // When the application completes, it should stop all running containers
662    LOG.info("Application completed. Stopping running containers");
663    nmClientAsync.stop();
664
665    // When the application completes, it should send a finish application
666    // signal to the RM
667    LOG.info("Application completed. Signalling finish to RM");
668
669    FinalApplicationStatus appStatus;
670    String appMessage = null;
671    boolean success = true;
672    if (numFailedContainers.get() == 0 && 
673        numCompletedContainers.get() == numTotalContainers) {
674      appStatus = FinalApplicationStatus.SUCCEEDED;
675    } else {
676      appStatus = FinalApplicationStatus.FAILED;
677      appMessage = "Diagnostics." + ", total=" + numTotalContainers
678          + ", completed=" + numCompletedContainers.get() + ", allocated="
679          + numAllocatedContainers.get() + ", failed="
680          + numFailedContainers.get();
681      LOG.info(appMessage);
682      success = false;
683    }
684    try {
685      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
686    } catch (YarnException ex) {
687      LOG.error("Failed to unregister application", ex);
688    } catch (IOException e) {
689      LOG.error("Failed to unregister application", e);
690    }
691    
692    amRMClient.stop();
693
694    // Stop Timeline Client
695    if(timelineClient != null) {
696      timelineClient.stop();
697    }
698
699    return success;
700  }
701  
702  private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
703    @SuppressWarnings("unchecked")
704    @Override
705    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
706      LOG.info("Got response from RM for container ask, completedCnt="
707          + completedContainers.size());
708      for (ContainerStatus containerStatus : completedContainers) {
709        LOG.info(appAttemptID + " got container status for containerID="
710            + containerStatus.getContainerId() + ", state="
711            + containerStatus.getState() + ", exitStatus="
712            + containerStatus.getExitStatus() + ", diagnostics="
713            + containerStatus.getDiagnostics());
714
715        // non complete containers should not be here
716        assert (containerStatus.getState() == ContainerState.COMPLETE);
717
718        // increment counters for completed/failed containers
719        int exitStatus = containerStatus.getExitStatus();
720        if (0 != exitStatus) {
721          // container failed
722          if (ContainerExitStatus.ABORTED != exitStatus) {
723            // shell script failed
724            // counts as completed
725            numCompletedContainers.incrementAndGet();
726            numFailedContainers.incrementAndGet();
727          } else {
728            // container was killed by framework, possibly preempted
729            // we should re-try as the container was lost for some reason
730            numAllocatedContainers.decrementAndGet();
731            numRequestedContainers.decrementAndGet();
732            // we do not need to release the container as it would be done
733            // by the RM
734          }
735        } else {
736          // nothing to do
737          // container completed successfully
738          numCompletedContainers.incrementAndGet();
739          LOG.info("Container completed successfully." + ", containerId="
740              + containerStatus.getContainerId());
741        }
742        if(timelineClient != null) {
743          publishContainerEndEvent(
744              timelineClient, containerStatus, domainId, appSubmitterUgi);
745        }
746      }
747      
748      // ask for more containers if any failed
749      int askCount = numTotalContainers - numRequestedContainers.get();
750      numRequestedContainers.addAndGet(askCount);
751
752      if (askCount > 0) {
753        for (int i = 0; i < askCount; ++i) {
754          ContainerRequest containerAsk = setupContainerAskForRM();
755          amRMClient.addContainerRequest(containerAsk);
756        }
757      }
758      
759      if (numCompletedContainers.get() == numTotalContainers) {
760        done = true;
761      }
762    }
763
764    @Override
765    public void onContainersAllocated(List<Container> allocatedContainers) {
766      LOG.info("Got response from RM for container ask, allocatedCnt="
767          + allocatedContainers.size());
768      numAllocatedContainers.addAndGet(allocatedContainers.size());
769      for (Container allocatedContainer : allocatedContainers) {
770        LOG.info("Launching shell command on a new container."
771            + ", containerId=" + allocatedContainer.getId()
772            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
773            + ":" + allocatedContainer.getNodeId().getPort()
774            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
775            + ", containerResourceMemory"
776            + allocatedContainer.getResource().getMemory()
777            + ", containerResourceVirtualCores"
778            + allocatedContainer.getResource().getVirtualCores());
779        // + ", containerToken"
780        // +allocatedContainer.getContainerToken().getIdentifier().toString());
781
782        LaunchContainerRunnable runnableLaunchContainer =
783            new LaunchContainerRunnable(allocatedContainer, containerListener);
784        Thread launchThread = new Thread(runnableLaunchContainer);
785
786        // launch and start the container on a separate thread to keep
787        // the main thread unblocked
788        // as all containers may not be allocated at one go.
789        launchThreads.add(launchThread);
790        launchThread.start();
791      }
792    }
793
794    @Override
795    public void onShutdownRequest() {
796      done = true;
797    }
798
799    @Override
800    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
801
802    @Override
803    public float getProgress() {
804      // set progress to deliver to RM on next heartbeat
805      float progress = (float) numCompletedContainers.get()
806          / numTotalContainers;
807      return progress;
808    }
809
810    @Override
811    public void onError(Throwable e) {
812      done = true;
813      amRMClient.stop();
814    }
815  }
816
817  @VisibleForTesting
818  static class NMCallbackHandler
819    implements NMClientAsync.CallbackHandler {
820
821    private ConcurrentMap<ContainerId, Container> containers =
822        new ConcurrentHashMap<ContainerId, Container>();
823    private final ApplicationMaster applicationMaster;
824
825    public NMCallbackHandler(ApplicationMaster applicationMaster) {
826      this.applicationMaster = applicationMaster;
827    }
828
829    public void addContainer(ContainerId containerId, Container container) {
830      containers.putIfAbsent(containerId, container);
831    }
832
833    @Override
834    public void onContainerStopped(ContainerId containerId) {
835      if (LOG.isDebugEnabled()) {
836        LOG.debug("Succeeded to stop Container " + containerId);
837      }
838      containers.remove(containerId);
839    }
840
841    @Override
842    public void onContainerStatusReceived(ContainerId containerId,
843        ContainerStatus containerStatus) {
844      if (LOG.isDebugEnabled()) {
845        LOG.debug("Container Status: id=" + containerId + ", status=" +
846            containerStatus);
847      }
848    }
849
850    @Override
851    public void onContainerStarted(ContainerId containerId,
852        Map<String, ByteBuffer> allServiceResponse) {
853      if (LOG.isDebugEnabled()) {
854        LOG.debug("Succeeded to start Container " + containerId);
855      }
856      Container container = containers.get(containerId);
857      if (container != null) {
858        applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
859      }
860      if(applicationMaster.timelineClient != null) {
861        ApplicationMaster.publishContainerStartEvent(
862            applicationMaster.timelineClient, container,
863            applicationMaster.domainId, applicationMaster.appSubmitterUgi);
864      }
865    }
866
867    @Override
868    public void onStartContainerError(ContainerId containerId, Throwable t) {
869      LOG.error("Failed to start Container " + containerId);
870      containers.remove(containerId);
871      applicationMaster.numCompletedContainers.incrementAndGet();
872      applicationMaster.numFailedContainers.incrementAndGet();
873    }
874
875    @Override
876    public void onGetContainerStatusError(
877        ContainerId containerId, Throwable t) {
878      LOG.error("Failed to query the status of Container " + containerId);
879    }
880
881    @Override
882    public void onStopContainerError(ContainerId containerId, Throwable t) {
883      LOG.error("Failed to stop Container " + containerId);
884      containers.remove(containerId);
885    }
886  }
887
888  /**
889   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
890   * that will execute the shell command.
891   */
892  private class LaunchContainerRunnable implements Runnable {
893
894    // Allocated container
895    Container container;
896
897    NMCallbackHandler containerListener;
898
899    /**
900     * @param lcontainer Allocated container
901     * @param containerListener Callback handler of the container
902     */
903    public LaunchContainerRunnable(
904        Container lcontainer, NMCallbackHandler containerListener) {
905      this.container = lcontainer;
906      this.containerListener = containerListener;
907    }
908
909    @Override
910    /**
911     * Connects to CM, sets up container launch context 
912     * for shell command and eventually dispatches the container 
913     * start request to the CM. 
914     */
915    public void run() {
916      LOG.info("Setting up container launch container for containerid="
917          + container.getId());
918
919      // Set the local resources
920      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
921
922      // The container for the eventual shell commands needs its own local
923      // resources too.
924      // In this scenario, if a shell script is specified, we need to have it
925      // copied and made available to the container.
926      if (!scriptPath.isEmpty()) {
927        Path renamedScriptPath = null;
928        if (Shell.WINDOWS) {
929          renamedScriptPath = new Path(scriptPath + ".bat");
930        } else {
931          renamedScriptPath = new Path(scriptPath + ".sh");
932        }
933
934        try {
935          // rename the script file based on the underlying OS syntax.
936          renameScriptFile(renamedScriptPath);
937        } catch (Exception e) {
938          LOG.error(
939              "Not able to add suffix (.bat/.sh) to the shell script filename",
940              e);
941          // We know we cannot continue launching the container
942          // so we should release it.
943          numCompletedContainers.incrementAndGet();
944          numFailedContainers.incrementAndGet();
945          return;
946        }
947
948        URL yarnUrl = null;
949        try {
950          yarnUrl = ConverterUtils.getYarnUrlFromURI(
951            new URI(renamedScriptPath.toString()));
952        } catch (URISyntaxException e) {
953          LOG.error("Error when trying to use shell script path specified"
954              + " in env, path=" + renamedScriptPath, e);
955          // A failure scenario on bad input such as invalid shell script path
956          // We know we cannot continue launching the container
957          // so we should release it.
958          // TODO
959          numCompletedContainers.incrementAndGet();
960          numFailedContainers.incrementAndGet();
961          return;
962        }
963        LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
964          LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
965          shellScriptPathLen, shellScriptPathTimestamp);
966        localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
967            ExecShellStringPath, shellRsrc);
968        shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
969      }
970
971      // Set the necessary command to execute on the allocated container
972      Vector<CharSequence> vargs = new Vector<CharSequence>(5);
973
974      // Set executable command
975      vargs.add(shellCommand);
976      // Set shell script path
977      if (!scriptPath.isEmpty()) {
978        vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
979            : ExecShellStringPath);
980      }
981
982      // Set args for the shell command if any
983      vargs.add(shellArgs);
984      // Add log redirect params
985      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
986      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
987
988      // Get final commmand
989      StringBuilder command = new StringBuilder();
990      for (CharSequence str : vargs) {
991        command.append(str).append(" ");
992      }
993
994      List<String> commands = new ArrayList<String>();
995      commands.add(command.toString());
996
997      // Set up ContainerLaunchContext, setting local resource, environment,
998      // command and token for constructor.
999
1000      // Note for tokens: Set up tokens for the container too. Today, for normal
1001      // shell commands, the container in distribute-shell doesn't need any
1002      // tokens. We are populating them mainly for NodeManagers to be able to
1003      // download anyfiles in the distributed file-system. The tokens are
1004      // otherwise also useful in cases, for e.g., when one is running a
1005      // "hadoop dfs" command inside the distributed shell.
1006      ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
1007        localResources, shellEnv, commands, null, allTokens.duplicate(), null);
1008      containerListener.addContainer(container.getId(), container);
1009      nmClientAsync.startContainerAsync(container, ctx);
1010    }
1011  }
1012
1013  private void renameScriptFile(final Path renamedScriptPath)
1014      throws IOException, InterruptedException {
1015    appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
1016      @Override
1017      public Void run() throws IOException {
1018        FileSystem fs = renamedScriptPath.getFileSystem(conf);
1019        fs.rename(new Path(scriptPath), renamedScriptPath);
1020        return null;
1021      }
1022    });
1023    LOG.info("User " + appSubmitterUgi.getUserName()
1024        + " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
1025  }
1026
1027  /**
1028   * Setup the request that will be sent to the RM for the container ask.
1029   *
1030   * @return the setup ResourceRequest to be sent to RM
1031   */
1032  private ContainerRequest setupContainerAskForRM() {
1033    // setup requirements for hosts
1034    // using * as any host will do for the distributed shell app
1035    // set the priority for the request
1036    // TODO - what is the range for priority? how to decide?
1037    Priority pri = Priority.newInstance(requestPriority);
1038
1039    // Set up resource type requirements
1040    // For now, memory and CPU are supported so we set memory and cpu requirements
1041    Resource capability = Resource.newInstance(containerMemory,
1042      containerVirtualCores);
1043
1044    ContainerRequest request = new ContainerRequest(capability, null, null,
1045        pri);
1046    LOG.info("Requested container ask: " + request.toString());
1047    return request;
1048  }
1049
1050  private boolean fileExist(String filePath) {
1051    return new File(filePath).exists();
1052  }
1053
1054  private String readContent(String filePath) throws IOException {
1055    DataInputStream ds = null;
1056    try {
1057      ds = new DataInputStream(new FileInputStream(filePath));
1058      return ds.readUTF();
1059    } finally {
1060      org.apache.commons.io.IOUtils.closeQuietly(ds);
1061    }
1062  }
1063  
1064  private static void publishContainerStartEvent(
1065      final TimelineClient timelineClient, Container container, String domainId,
1066      UserGroupInformation ugi) {
1067    final TimelineEntity entity = new TimelineEntity();
1068    entity.setEntityId(container.getId().toString());
1069    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1070    entity.setDomainId(domainId);
1071    entity.addPrimaryFilter("user", ugi.getShortUserName());
1072    TimelineEvent event = new TimelineEvent();
1073    event.setTimestamp(System.currentTimeMillis());
1074    event.setEventType(DSEvent.DS_CONTAINER_START.toString());
1075    event.addEventInfo("Node", container.getNodeId().toString());
1076    event.addEventInfo("Resources", container.getResource().toString());
1077    entity.addEvent(event);
1078
1079    try {
1080      ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
1081        @Override
1082        public TimelinePutResponse run() throws Exception {
1083          return timelineClient.putEntities(entity);
1084        }
1085      });
1086    } catch (Exception e) {
1087      LOG.error("Container start event could not be published for "
1088          + container.getId().toString(),
1089          e instanceof UndeclaredThrowableException ? e.getCause() : e);
1090    }
1091  }
1092
1093  private static void publishContainerEndEvent(
1094      final TimelineClient timelineClient, ContainerStatus container,
1095      String domainId, UserGroupInformation ugi) {
1096    final TimelineEntity entity = new TimelineEntity();
1097    entity.setEntityId(container.getContainerId().toString());
1098    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1099    entity.setDomainId(domainId);
1100    entity.addPrimaryFilter("user", ugi.getShortUserName());
1101    TimelineEvent event = new TimelineEvent();
1102    event.setTimestamp(System.currentTimeMillis());
1103    event.setEventType(DSEvent.DS_CONTAINER_END.toString());
1104    event.addEventInfo("State", container.getState().name());
1105    event.addEventInfo("Exit Status", container.getExitStatus());
1106    entity.addEvent(event);
1107
1108    try {
1109      ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
1110        @Override
1111        public TimelinePutResponse run() throws Exception {
1112          return timelineClient.putEntities(entity);
1113        }
1114      });
1115    } catch (Exception e) {
1116      LOG.error("Container end event could not be published for "
1117          + container.getContainerId().toString(),
1118          e instanceof UndeclaredThrowableException ? e.getCause() : e);
1119    }
1120  }
1121
1122  private static void publishApplicationAttemptEvent(
1123      final TimelineClient timelineClient, String appAttemptId,
1124      DSEvent appEvent, String domainId, UserGroupInformation ugi) {
1125    final TimelineEntity entity = new TimelineEntity();
1126    entity.setEntityId(appAttemptId);
1127    entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
1128    entity.setDomainId(domainId);
1129    entity.addPrimaryFilter("user", ugi.getShortUserName());
1130    TimelineEvent event = new TimelineEvent();
1131    event.setEventType(appEvent.toString());
1132    event.setTimestamp(System.currentTimeMillis());
1133    entity.addEvent(event);
1134
1135    try {
1136      ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
1137        @Override
1138        public TimelinePutResponse run() throws Exception {
1139          return timelineClient.putEntities(entity);
1140        }
1141      });
1142    } catch (Exception e) {
1143      LOG.error("App Attempt "
1144          + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
1145          + " event could not be published for "
1146          + appAttemptId.toString(),
1147          e instanceof UndeclaredThrowableException ? e.getCause() : e);
1148    }
1149  }
1150}