001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.yarn.applications.distributedshell;
020    
021    import java.io.BufferedReader;
022    import java.io.DataInputStream;
023    import java.io.File;
024    import java.io.FileInputStream;
025    import java.io.IOException;
026    import java.io.StringReader;
027    import java.net.URI;
028    import java.net.URISyntaxException;
029    import java.nio.ByteBuffer;
030    import java.security.PrivilegedExceptionAction;
031    import java.util.ArrayList;
032    import java.util.HashMap;
033    import java.util.Iterator;
034    import java.util.List;
035    import java.util.Map;
036    import java.util.Vector;
037    import java.util.concurrent.ConcurrentHashMap;
038    import java.util.concurrent.ConcurrentMap;
039    import java.util.concurrent.atomic.AtomicInteger;
040    
041    import org.apache.commons.cli.CommandLine;
042    import org.apache.commons.cli.GnuParser;
043    import org.apache.commons.cli.HelpFormatter;
044    import org.apache.commons.cli.Options;
045    import org.apache.commons.cli.ParseException;
046    import org.apache.commons.logging.Log;
047    import org.apache.commons.logging.LogFactory;
048    import org.apache.hadoop.classification.InterfaceAudience;
049    import org.apache.hadoop.classification.InterfaceAudience.Private;
050    import org.apache.hadoop.classification.InterfaceStability;
051    import org.apache.hadoop.conf.Configuration;
052    import org.apache.hadoop.fs.FileSystem;
053    import org.apache.hadoop.fs.Path;
054    import org.apache.hadoop.io.DataOutputBuffer;
055    import org.apache.hadoop.io.IOUtils;
056    import org.apache.hadoop.net.NetUtils;
057    import org.apache.hadoop.security.Credentials;
058    import org.apache.hadoop.security.UserGroupInformation;
059    import org.apache.hadoop.security.token.Token;
060    import org.apache.hadoop.util.ExitUtil;
061    import org.apache.hadoop.util.Shell;
062    import org.apache.hadoop.yarn.api.ApplicationConstants;
063    import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
064    import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
065    import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
066    import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
067    import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
068    import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
069    import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
070    import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
071    import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
072    import org.apache.hadoop.yarn.api.records.Container;
073    import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
074    import org.apache.hadoop.yarn.api.records.ContainerId;
075    import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
076    import org.apache.hadoop.yarn.api.records.ContainerState;
077    import org.apache.hadoop.yarn.api.records.ContainerStatus;
078    import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
079    import org.apache.hadoop.yarn.api.records.LocalResource;
080    import org.apache.hadoop.yarn.api.records.LocalResourceType;
081    import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
082    import org.apache.hadoop.yarn.api.records.NodeReport;
083    import org.apache.hadoop.yarn.api.records.Priority;
084    import org.apache.hadoop.yarn.api.records.Resource;
085    import org.apache.hadoop.yarn.api.records.ResourceRequest;
086    import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
087    import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
088    import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
089    import org.apache.hadoop.yarn.client.api.TimelineClient;
090    import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
091    import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
092    import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
093    import org.apache.hadoop.yarn.conf.YarnConfiguration;
094    import org.apache.hadoop.yarn.exceptions.YarnException;
095    import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
096    import org.apache.hadoop.yarn.util.ConverterUtils;
097    import org.apache.hadoop.yarn.util.Records;
098    import org.apache.log4j.LogManager;
099    
100    import com.google.common.annotations.VisibleForTesting;
101    
102    /**
103     * An ApplicationMaster for executing shell commands on a set of launched
104     * containers using the YARN framework.
105     * 
106     * <p>
107     * This class is meant to act as an example on how to write yarn-based
108     * application masters.
109     * </p>
110     * 
111     * <p>
112     * The ApplicationMaster is started on a container by the
113     * <code>ResourceManager</code>'s launcher. The first thing that the
114     * <code>ApplicationMaster</code> needs to do is to connect and register itself
115     * with the <code>ResourceManager</code>. The registration sets up information
116     * within the <code>ResourceManager</code> regarding what host:port the
117     * ApplicationMaster is listening on to provide any form of functionality to a
118     * client as well as a tracking url that a client can use to keep track of
119     * status/job history if needed. However, in the distributedshell, trackingurl
120     * and appMasterHost:appMasterRpcPort are not supported.
121     * </p>
122     * 
123     * <p>
124     * The <code>ApplicationMaster</code> needs to send a heartbeat to the
125     * <code>ResourceManager</code> at regular intervals to inform the
126     * <code>ResourceManager</code> that it is up and alive. The
127     * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
128     * <code>ApplicationMaster</code> acts as a heartbeat.
129     * 
130     * <p>
131     * For the actual handling of the job, the <code>ApplicationMaster</code> has to
132     * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
133     * required no. of containers using {@link ResourceRequest} with the necessary
134     * resource specifications such as node location, computational
135     * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
136     * responds with an {@link AllocateResponse} that informs the
137     * <code>ApplicationMaster</code> of the set of newly allocated containers,
138     * completed containers as well as current state of available resources.
139     * </p>
140     * 
141     * <p>
142     * For each allocated container, the <code>ApplicationMaster</code> can then set
143     * up the necessary launch context via {@link ContainerLaunchContext} to specify
144     * the allocated container id, local resources required by the executable, the
145     * environment to be setup for the executable, commands to execute, etc. and
146     * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
147     * launch and execute the defined commands on the given allocated container.
148     * </p>
149     * 
150     * <p>
151     * The <code>ApplicationMaster</code> can monitor the launched container by
152     * either querying the <code>ResourceManager</code> using
153     * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
154     * the {@link ContainerManagementProtocol} by querying for the status of the allocated
155     * container's {@link ContainerId}.
156     *
157     * <p>
158     * After the job has been completed, the <code>ApplicationMaster</code> has to
159     * send a {@link FinishApplicationMasterRequest} to the
160     * <code>ResourceManager</code> to inform it that the
161     * <code>ApplicationMaster</code> has been completed.
162     */
163    @InterfaceAudience.Public
164    @InterfaceStability.Unstable
165    public class ApplicationMaster {
166    
167      private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
168    
169      @VisibleForTesting
170      @Private
171      public static enum DSEvent {
172        DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
173      }
174      
175      @VisibleForTesting
176      @Private
177      public static enum DSEntity {
178        DS_APP_ATTEMPT, DS_CONTAINER
179      }
180    
181      // Configuration
182      private Configuration conf;
183    
184      // Handle to communicate with the Resource Manager
185      @SuppressWarnings("rawtypes")
186      private AMRMClientAsync amRMClient;
187    
188      // In both secure and non-secure modes, this points to the job-submitter.
189      private UserGroupInformation appSubmitterUgi;
190    
191      // Handle to communicate with the Node Manager
192      private NMClientAsync nmClientAsync;
193      // Listen to process the response from the Node Manager
194      private NMCallbackHandler containerListener;
195      
196      // Application Attempt Id ( combination of attemptId and fail count )
197      @VisibleForTesting
198      protected ApplicationAttemptId appAttemptID;
199    
200      // TODO
201      // For status update for clients - yet to be implemented
202      // Hostname of the container
203      private String appMasterHostname = "";
204      // Port on which the app master listens for status updates from clients
205      private int appMasterRpcPort = -1;
206      // Tracking url to which app master publishes info for clients to monitor
207      private String appMasterTrackingUrl = "";
208    
209      // App Master configuration
210      // No. of containers to run shell command on
211      private int numTotalContainers = 1;
212      // Memory to request for the container on which the shell command will run
213      private int containerMemory = 10;
214      // VirtualCores to request for the container on which the shell command will run
215      private int containerVirtualCores = 1;
216      // Priority of the request
217      private int requestPriority;
218    
219      // Counter for completed containers ( complete denotes successful or failed )
220      private AtomicInteger numCompletedContainers = new AtomicInteger();
221      // Allocated container count so that we know how many containers has the RM
222      // allocated to us
223      @VisibleForTesting
224      protected AtomicInteger numAllocatedContainers = new AtomicInteger();
225      // Count of failed containers
226      private AtomicInteger numFailedContainers = new AtomicInteger();
227      // Count of containers already requested from the RM
228      // Needed as once requested, we should not request for containers again.
229      // Only request for more if the original requirement changes.
230      @VisibleForTesting
231      protected AtomicInteger numRequestedContainers = new AtomicInteger();
232    
233      // Shell command to be executed
234      private String shellCommand = "";
235      // Args to be passed to the shell command
236      private String shellArgs = "";
237      // Env variables to be setup for the shell command
238      private Map<String, String> shellEnv = new HashMap<String, String>();
239    
240      // Location of shell script ( obtained from info set in env )
241      // Shell script path in fs
242      private String scriptPath = "";
243      // Timestamp needed for creating a local resource
244      private long shellScriptPathTimestamp = 0;
245      // File length needed for local resource
246      private long shellScriptPathLen = 0;
247    
248      // Hardcoded path to shell script in launch container's local env
249      private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
250      private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
251          + ".bat";
252    
253      // Hardcoded path to custom log_properties
254      private static final String log4jPath = "log4j.properties";
255    
256      private static final String shellCommandPath = "shellCommands";
257      private static final String shellArgsPath = "shellArgs";
258    
259      private volatile boolean done;
260    
261      private ByteBuffer allTokens;
262    
263      // Launch threads
264      private List<Thread> launchThreads = new ArrayList<Thread>();
265    
266      // Timeline Client
267      private TimelineClient timelineClient;
268    
269      private final String linux_bash_command = "bash";
270      private final String windows_command = "cmd /c";
271    
272      /**
273       * @param args Command line args
274       */
275      public static void main(String[] args) {
276        boolean result = false;
277        try {
278          ApplicationMaster appMaster = new ApplicationMaster();
279          LOG.info("Initializing ApplicationMaster");
280          boolean doRun = appMaster.init(args);
281          if (!doRun) {
282            System.exit(0);
283          }
284          appMaster.run();
285          result = appMaster.finish();
286        } catch (Throwable t) {
287          LOG.fatal("Error running ApplicationMaster", t);
288          LogManager.shutdown();
289          ExitUtil.terminate(1, t);
290        }
291        if (result) {
292          LOG.info("Application Master completed successfully. exiting");
293          System.exit(0);
294        } else {
295          LOG.info("Application Master failed. exiting");
296          System.exit(2);
297        }
298      }
299    
300      /**
301       * Dump out contents of $CWD and the environment to stdout for debugging
302       */
303      private void dumpOutDebugInfo() {
304    
305        LOG.info("Dump debug output");
306        Map<String, String> envs = System.getenv();
307        for (Map.Entry<String, String> env : envs.entrySet()) {
308          LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
309          System.out.println("System env: key=" + env.getKey() + ", val="
310              + env.getValue());
311        }
312    
313        BufferedReader buf = null;
314        try {
315          String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
316            Shell.execCommand("ls", "-al");
317          buf = new BufferedReader(new StringReader(lines));
318          String line = "";
319          while ((line = buf.readLine()) != null) {
320            LOG.info("System CWD content: " + line);
321            System.out.println("System CWD content: " + line);
322          }
323        } catch (IOException e) {
324          e.printStackTrace();
325        } finally {
326          IOUtils.cleanup(LOG, buf);
327        }
328      }
329    
330      public ApplicationMaster() {
331        // Set up the configuration
332        conf = new YarnConfiguration();
333      }
334    
335      /**
336       * Parse command line options
337       *
338       * @param args Command line args
339       * @return Whether init successful and run should be invoked
340       * @throws ParseException
341       * @throws IOException
342       */
343      public boolean init(String[] args) throws ParseException, IOException {
344        Options opts = new Options();
345        opts.addOption("app_attempt_id", true,
346            "App Attempt ID. Not to be used unless for testing purposes");
347        opts.addOption("shell_env", true,
348            "Environment for shell script. Specified as env_key=env_val pairs");
349        opts.addOption("container_memory", true,
350            "Amount of memory in MB to be requested to run the shell command");
351        opts.addOption("container_vcores", true,
352            "Amount of virtual cores to be requested to run the shell command");
353        opts.addOption("num_containers", true,
354            "No. of containers on which the shell command needs to be executed");
355        opts.addOption("priority", true, "Application Priority. Default 0");
356        opts.addOption("debug", false, "Dump out debug information");
357    
358        opts.addOption("help", false, "Print usage");
359        CommandLine cliParser = new GnuParser().parse(opts, args);
360    
361        if (args.length == 0) {
362          printUsage(opts);
363          throw new IllegalArgumentException(
364              "No args specified for application master to initialize");
365        }
366    
367        //Check whether customer log4j.properties file exists
368        if (fileExist(log4jPath)) {
369          try {
370            Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
371                log4jPath);
372          } catch (Exception e) {
373            LOG.warn("Can not set up custom log4j properties. " + e);
374          }
375        }
376    
377        if (cliParser.hasOption("help")) {
378          printUsage(opts);
379          return false;
380        }
381    
382        if (cliParser.hasOption("debug")) {
383          dumpOutDebugInfo();
384        }
385    
386        Map<String, String> envs = System.getenv();
387    
388        if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
389          if (cliParser.hasOption("app_attempt_id")) {
390            String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
391            appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
392          } else {
393            throw new IllegalArgumentException(
394                "Application Attempt Id not set in the environment");
395          }
396        } else {
397          ContainerId containerId = ConverterUtils.toContainerId(envs
398              .get(Environment.CONTAINER_ID.name()));
399          appAttemptID = containerId.getApplicationAttemptId();
400        }
401    
402        if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
403          throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
404              + " not set in the environment");
405        }
406        if (!envs.containsKey(Environment.NM_HOST.name())) {
407          throw new RuntimeException(Environment.NM_HOST.name()
408              + " not set in the environment");
409        }
410        if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
411          throw new RuntimeException(Environment.NM_HTTP_PORT
412              + " not set in the environment");
413        }
414        if (!envs.containsKey(Environment.NM_PORT.name())) {
415          throw new RuntimeException(Environment.NM_PORT.name()
416              + " not set in the environment");
417        }
418    
419        LOG.info("Application master for app" + ", appId="
420            + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
421            + appAttemptID.getApplicationId().getClusterTimestamp()
422            + ", attemptId=" + appAttemptID.getAttemptId());
423    
424        if (!fileExist(shellCommandPath)
425            && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
426          throw new IllegalArgumentException(
427              "No shell command or shell script specified to be executed by application master");
428        }
429    
430        if (fileExist(shellCommandPath)) {
431          shellCommand = readContent(shellCommandPath);
432        }
433    
434        if (fileExist(shellArgsPath)) {
435          shellArgs = readContent(shellArgsPath);
436        }
437    
438        if (cliParser.hasOption("shell_env")) {
439          String shellEnvs[] = cliParser.getOptionValues("shell_env");
440          for (String env : shellEnvs) {
441            env = env.trim();
442            int index = env.indexOf('=');
443            if (index == -1) {
444              shellEnv.put(env, "");
445              continue;
446            }
447            String key = env.substring(0, index);
448            String val = "";
449            if (index < (env.length() - 1)) {
450              val = env.substring(index + 1);
451            }
452            shellEnv.put(key, val);
453          }
454        }
455    
456        if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
457          scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
458    
459          if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
460            shellScriptPathTimestamp = Long.valueOf(envs
461                .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
462          }
463          if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
464            shellScriptPathLen = Long.valueOf(envs
465                .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
466          }
467    
468          if (!scriptPath.isEmpty()
469              && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
470            LOG.error("Illegal values in env for shell script path" + ", path="
471                + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
472                + shellScriptPathTimestamp);
473            throw new IllegalArgumentException(
474                "Illegal values in env for shell script path");
475          }
476        }
477    
478        containerMemory = Integer.parseInt(cliParser.getOptionValue(
479            "container_memory", "10"));
480        containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
481            "container_vcores", "1"));
482        numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
483            "num_containers", "1"));
484        if (numTotalContainers == 0) {
485          throw new IllegalArgumentException(
486              "Cannot run distributed shell with no containers");
487        }
488        requestPriority = Integer.parseInt(cliParser
489            .getOptionValue("priority", "0"));
490    
491        // Creating the Timeline Client
492        timelineClient = TimelineClient.createTimelineClient();
493        timelineClient.init(conf);
494        timelineClient.start();
495    
496        return true;
497      }
498    
499      /**
500       * Helper function to print usage
501       *
502       * @param opts Parsed command line options
503       */
504      private void printUsage(Options opts) {
505        new HelpFormatter().printHelp("ApplicationMaster", opts);
506      }
507    
508      /**
509       * Main run function for the application master
510       *
511       * @throws YarnException
512       * @throws IOException
513       */
514      @SuppressWarnings({ "unchecked" })
515      public void run() throws YarnException, IOException {
516        LOG.info("Starting ApplicationMaster");
517        try {
518          publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
519              DSEvent.DS_APP_ATTEMPT_START);
520        } catch (Exception e) {
521          LOG.error("App Attempt start event coud not be pulished for "
522              + appAttemptID.toString(), e);
523        }
524    
525        Credentials credentials =
526            UserGroupInformation.getCurrentUser().getCredentials();
527        DataOutputBuffer dob = new DataOutputBuffer();
528        credentials.writeTokenStorageToStream(dob);
529        // Now remove the AM->RM token so that containers cannot access it.
530        Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
531        LOG.info("Executing with tokens:");
532        while (iter.hasNext()) {
533          Token<?> token = iter.next();
534          LOG.info(token);
535          if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
536            iter.remove();
537          }
538        }
539        allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
540    
541        // Create appSubmitterUgi and add original tokens to it
542        String appSubmitterUserName =
543            System.getenv(ApplicationConstants.Environment.USER.name());
544        appSubmitterUgi =
545            UserGroupInformation.createRemoteUser(appSubmitterUserName);
546        appSubmitterUgi.addCredentials(credentials);
547    
548        AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
549        amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
550        amRMClient.init(conf);
551        amRMClient.start();
552    
553        containerListener = createNMCallbackHandler();
554        nmClientAsync = new NMClientAsyncImpl(containerListener);
555        nmClientAsync.init(conf);
556        nmClientAsync.start();
557    
558        // Setup local RPC Server to accept status requests directly from clients
559        // TODO need to setup a protocol for client to be able to communicate to
560        // the RPC server
561        // TODO use the rpc port info to register with the RM for the client to
562        // send requests to this app master
563    
564        // Register self with ResourceManager
565        // This will start heartbeating to the RM
566        appMasterHostname = NetUtils.getHostname();
567        RegisterApplicationMasterResponse response = amRMClient
568            .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
569                appMasterTrackingUrl);
570        // Dump out information about cluster capability as seen by the
571        // resource manager
572        int maxMem = response.getMaximumResourceCapability().getMemory();
573        LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
574        
575        int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
576        LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
577    
578        // A resource ask cannot exceed the max.
579        if (containerMemory > maxMem) {
580          LOG.info("Container memory specified above max threshold of cluster."
581              + " Using max value." + ", specified=" + containerMemory + ", max="
582              + maxMem);
583          containerMemory = maxMem;
584        }
585    
586        if (containerVirtualCores > maxVCores) {
587          LOG.info("Container virtual cores specified above max threshold of cluster."
588              + " Using max value." + ", specified=" + containerVirtualCores + ", max="
589              + maxVCores);
590          containerVirtualCores = maxVCores;
591        }
592    
593        List<Container> previousAMRunningContainers =
594            response.getContainersFromPreviousAttempts();
595        LOG.info("Received " + previousAMRunningContainers.size()
596            + " previous AM's running containers on AM registration.");
597        numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
598    
599        int numTotalContainersToRequest =
600            numTotalContainers - previousAMRunningContainers.size();
601        // Setup ask for containers from RM
602        // Send request for containers to RM
603        // Until we get our fully allocated quota, we keep on polling RM for
604        // containers
605        // Keep looping until all the containers are launched and shell script
606        // executed on them ( regardless of success/failure).
607        for (int i = 0; i < numTotalContainersToRequest; ++i) {
608          ContainerRequest containerAsk = setupContainerAskForRM();
609          amRMClient.addContainerRequest(containerAsk);
610        }
611        numRequestedContainers.set(numTotalContainersToRequest);
612        try {
613          publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
614              DSEvent.DS_APP_ATTEMPT_END);
615        } catch (Exception e) {
616          LOG.error("App Attempt start event coud not be pulished for "
617              + appAttemptID.toString(), e);
618        }
619      }
620    
621      @VisibleForTesting
622      NMCallbackHandler createNMCallbackHandler() {
623        return new NMCallbackHandler(this);
624      }
625    
626      @VisibleForTesting
627      protected boolean finish() {
628        // wait for completion.
629        while (!done
630            && (numCompletedContainers.get() != numTotalContainers)) {
631          try {
632            Thread.sleep(200);
633          } catch (InterruptedException ex) {}
634        }
635    
636        // Join all launched threads
637        // needed for when we time out
638        // and we need to release containers
639        for (Thread launchThread : launchThreads) {
640          try {
641            launchThread.join(10000);
642          } catch (InterruptedException e) {
643            LOG.info("Exception thrown in thread join: " + e.getMessage());
644            e.printStackTrace();
645          }
646        }
647    
648        // When the application completes, it should stop all running containers
649        LOG.info("Application completed. Stopping running containers");
650        nmClientAsync.stop();
651    
652        // When the application completes, it should send a finish application
653        // signal to the RM
654        LOG.info("Application completed. Signalling finish to RM");
655    
656        FinalApplicationStatus appStatus;
657        String appMessage = null;
658        boolean success = true;
659        if (numFailedContainers.get() == 0 && 
660            numCompletedContainers.get() == numTotalContainers) {
661          appStatus = FinalApplicationStatus.SUCCEEDED;
662        } else {
663          appStatus = FinalApplicationStatus.FAILED;
664          appMessage = "Diagnostics." + ", total=" + numTotalContainers
665              + ", completed=" + numCompletedContainers.get() + ", allocated="
666              + numAllocatedContainers.get() + ", failed="
667              + numFailedContainers.get();
668          success = false;
669        }
670        try {
671          amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
672        } catch (YarnException ex) {
673          LOG.error("Failed to unregister application", ex);
674        } catch (IOException e) {
675          LOG.error("Failed to unregister application", e);
676        }
677        
678        amRMClient.stop();
679    
680        return success;
681      }
682      
683      private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
684        @SuppressWarnings("unchecked")
685        @Override
686        public void onContainersCompleted(List<ContainerStatus> completedContainers) {
687          LOG.info("Got response from RM for container ask, completedCnt="
688              + completedContainers.size());
689          for (ContainerStatus containerStatus : completedContainers) {
690            LOG.info("Got container status for containerID="
691                + containerStatus.getContainerId() + ", state="
692                + containerStatus.getState() + ", exitStatus="
693                + containerStatus.getExitStatus() + ", diagnostics="
694                + containerStatus.getDiagnostics());
695    
696            // non complete containers should not be here
697            assert (containerStatus.getState() == ContainerState.COMPLETE);
698    
699            // increment counters for completed/failed containers
700            int exitStatus = containerStatus.getExitStatus();
701            if (0 != exitStatus) {
702              // container failed
703              if (ContainerExitStatus.ABORTED != exitStatus) {
704                // shell script failed
705                // counts as completed
706                numCompletedContainers.incrementAndGet();
707                numFailedContainers.incrementAndGet();
708              } else {
709                // container was killed by framework, possibly preempted
710                // we should re-try as the container was lost for some reason
711                numAllocatedContainers.decrementAndGet();
712                numRequestedContainers.decrementAndGet();
713                // we do not need to release the container as it would be done
714                // by the RM
715              }
716            } else {
717              // nothing to do
718              // container completed successfully
719              numCompletedContainers.incrementAndGet();
720              LOG.info("Container completed successfully." + ", containerId="
721                  + containerStatus.getContainerId());
722            }
723            try {
724              publishContainerEndEvent(timelineClient, containerStatus);
725            } catch (Exception e) {
726              LOG.error("Container start event could not be pulished for "
727                  + containerStatus.getContainerId().toString(), e);
728            }
729          }
730          
731          // ask for more containers if any failed
732          int askCount = numTotalContainers - numRequestedContainers.get();
733          numRequestedContainers.addAndGet(askCount);
734    
735          if (askCount > 0) {
736            for (int i = 0; i < askCount; ++i) {
737              ContainerRequest containerAsk = setupContainerAskForRM();
738              amRMClient.addContainerRequest(containerAsk);
739            }
740          }
741          
742          if (numCompletedContainers.get() == numTotalContainers) {
743            done = true;
744          }
745        }
746    
747        @Override
748        public void onContainersAllocated(List<Container> allocatedContainers) {
749          LOG.info("Got response from RM for container ask, allocatedCnt="
750              + allocatedContainers.size());
751          numAllocatedContainers.addAndGet(allocatedContainers.size());
752          for (Container allocatedContainer : allocatedContainers) {
753            LOG.info("Launching shell command on a new container."
754                + ", containerId=" + allocatedContainer.getId()
755                + ", containerNode=" + allocatedContainer.getNodeId().getHost()
756                + ":" + allocatedContainer.getNodeId().getPort()
757                + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
758                + ", containerResourceMemory"
759                + allocatedContainer.getResource().getMemory()
760                + ", containerResourceVirtualCores"
761                + allocatedContainer.getResource().getVirtualCores());
762            // + ", containerToken"
763            // +allocatedContainer.getContainerToken().getIdentifier().toString());
764    
765            LaunchContainerRunnable runnableLaunchContainer =
766                new LaunchContainerRunnable(allocatedContainer, containerListener);
767            Thread launchThread = new Thread(runnableLaunchContainer);
768    
769            // launch and start the container on a separate thread to keep
770            // the main thread unblocked
771            // as all containers may not be allocated at one go.
772            launchThreads.add(launchThread);
773            launchThread.start();
774          }
775        }
776    
777        @Override
778        public void onShutdownRequest() {
779          done = true;
780        }
781    
782        @Override
783        public void onNodesUpdated(List<NodeReport> updatedNodes) {}
784    
785        @Override
786        public float getProgress() {
787          // set progress to deliver to RM on next heartbeat
788          float progress = (float) numCompletedContainers.get()
789              / numTotalContainers;
790          return progress;
791        }
792    
793        @Override
794        public void onError(Throwable e) {
795          done = true;
796          amRMClient.stop();
797        }
798      }
799    
800      @VisibleForTesting
801      static class NMCallbackHandler
802        implements NMClientAsync.CallbackHandler {
803    
804        private ConcurrentMap<ContainerId, Container> containers =
805            new ConcurrentHashMap<ContainerId, Container>();
806        private final ApplicationMaster applicationMaster;
807    
808        public NMCallbackHandler(ApplicationMaster applicationMaster) {
809          this.applicationMaster = applicationMaster;
810        }
811    
812        public void addContainer(ContainerId containerId, Container container) {
813          containers.putIfAbsent(containerId, container);
814        }
815    
816        @Override
817        public void onContainerStopped(ContainerId containerId) {
818          if (LOG.isDebugEnabled()) {
819            LOG.debug("Succeeded to stop Container " + containerId);
820          }
821          containers.remove(containerId);
822        }
823    
824        @Override
825        public void onContainerStatusReceived(ContainerId containerId,
826            ContainerStatus containerStatus) {
827          if (LOG.isDebugEnabled()) {
828            LOG.debug("Container Status: id=" + containerId + ", status=" +
829                containerStatus);
830          }
831        }
832    
833        @Override
834        public void onContainerStarted(ContainerId containerId,
835            Map<String, ByteBuffer> allServiceResponse) {
836          if (LOG.isDebugEnabled()) {
837            LOG.debug("Succeeded to start Container " + containerId);
838          }
839          Container container = containers.get(containerId);
840          if (container != null) {
841            applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
842          }
843          try {
844            ApplicationMaster.publishContainerStartEvent(
845                applicationMaster.timelineClient, container);
846          } catch (Exception e) {
847            LOG.error("Container start event coud not be pulished for "
848                + container.getId().toString(), e);
849          }
850        }
851    
852        @Override
853        public void onStartContainerError(ContainerId containerId, Throwable t) {
854          LOG.error("Failed to start Container " + containerId);
855          containers.remove(containerId);
856          applicationMaster.numCompletedContainers.incrementAndGet();
857          applicationMaster.numFailedContainers.incrementAndGet();
858        }
859    
860        @Override
861        public void onGetContainerStatusError(
862            ContainerId containerId, Throwable t) {
863          LOG.error("Failed to query the status of Container " + containerId);
864        }
865    
866        @Override
867        public void onStopContainerError(ContainerId containerId, Throwable t) {
868          LOG.error("Failed to stop Container " + containerId);
869          containers.remove(containerId);
870        }
871      }
872    
873      /**
874       * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
875       * that will execute the shell command.
876       */
877      private class LaunchContainerRunnable implements Runnable {
878    
879        // Allocated container
880        Container container;
881    
882        NMCallbackHandler containerListener;
883    
884        /**
885         * @param lcontainer Allocated container
886         * @param containerListener Callback handler of the container
887         */
888        public LaunchContainerRunnable(
889            Container lcontainer, NMCallbackHandler containerListener) {
890          this.container = lcontainer;
891          this.containerListener = containerListener;
892        }
893    
894        @Override
895        /**
896         * Connects to CM, sets up container launch context 
897         * for shell command and eventually dispatches the container 
898         * start request to the CM. 
899         */
900        public void run() {
901          LOG.info("Setting up container launch container for containerid="
902              + container.getId());
903          ContainerLaunchContext ctx = Records
904              .newRecord(ContainerLaunchContext.class);
905    
906          // Set the environment
907          ctx.setEnvironment(shellEnv);
908    
909          // Set the local resources
910          Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
911    
912          // The container for the eventual shell commands needs its own local
913          // resources too.
914          // In this scenario, if a shell script is specified, we need to have it
915          // copied and made available to the container.
916          if (!scriptPath.isEmpty()) {
917            Path renamedScriptPath = null;
918            if (Shell.WINDOWS) {
919              renamedScriptPath = new Path(scriptPath + ".bat");
920            } else {
921              renamedScriptPath = new Path(scriptPath + ".sh");
922            }
923    
924            try {
925              // rename the script file based on the underlying OS syntax.
926              renameScriptFile(renamedScriptPath);
927            } catch (Exception e) {
928              LOG.error(
929                  "Not able to add suffix (.bat/.sh) to the shell script filename",
930                  e);
931              // We know we cannot continue launching the container
932              // so we should release it.
933              numCompletedContainers.incrementAndGet();
934              numFailedContainers.incrementAndGet();
935              return;
936            }
937    
938            LocalResource shellRsrc = Records.newRecord(LocalResource.class);
939            shellRsrc.setType(LocalResourceType.FILE);
940            shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
941            try {
942              shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
943                renamedScriptPath.toString())));
944            } catch (URISyntaxException e) {
945              LOG.error("Error when trying to use shell script path specified"
946                  + " in env, path=" + renamedScriptPath, e);
947    
948              // A failure scenario on bad input such as invalid shell script path
949              // We know we cannot continue launching the container
950              // so we should release it.
951              // TODO
952              numCompletedContainers.incrementAndGet();
953              numFailedContainers.incrementAndGet();
954              return;
955            }
956            shellRsrc.setTimestamp(shellScriptPathTimestamp);
957            shellRsrc.setSize(shellScriptPathLen);
958            localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
959                ExecShellStringPath, shellRsrc);
960            shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
961          }
962          ctx.setLocalResources(localResources);
963    
964          // Set the necessary command to execute on the allocated container
965          Vector<CharSequence> vargs = new Vector<CharSequence>(5);
966    
967          // Set executable command
968          vargs.add(shellCommand);
969          // Set shell script path
970          if (!scriptPath.isEmpty()) {
971            vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
972                : ExecShellStringPath);
973          }
974    
975          // Set args for the shell command if any
976          vargs.add(shellArgs);
977          // Add log redirect params
978          vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
979          vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
980    
981          // Get final commmand
982          StringBuilder command = new StringBuilder();
983          for (CharSequence str : vargs) {
984            command.append(str).append(" ");
985          }
986    
987          List<String> commands = new ArrayList<String>();
988          commands.add(command.toString());
989          ctx.setCommands(commands);
990    
991          // Set up tokens for the container too. Today, for normal shell commands,
992          // the container in distribute-shell doesn't need any tokens. We are
993          // populating them mainly for NodeManagers to be able to download any
994          // files in the distributed file-system. The tokens are otherwise also
995          // useful in cases, for e.g., when one is running a "hadoop dfs" command
996          // inside the distributed shell.
997          ctx.setTokens(allTokens.duplicate());
998    
999          containerListener.addContainer(container.getId(), container);
1000          nmClientAsync.startContainerAsync(container, ctx);
1001        }
1002      }
1003    
1004      private void renameScriptFile(final Path renamedScriptPath)
1005          throws IOException, InterruptedException {
1006        appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
1007          @Override
1008          public Void run() throws IOException {
1009            FileSystem fs = renamedScriptPath.getFileSystem(conf);
1010            fs.rename(new Path(scriptPath), renamedScriptPath);
1011            return null;
1012          }
1013        });
1014        LOG.info("User " + appSubmitterUgi.getUserName()
1015            + " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
1016      }
1017    
1018      /**
1019       * Setup the request that will be sent to the RM for the container ask.
1020       *
1021       * @return the setup ResourceRequest to be sent to RM
1022       */
1023      private ContainerRequest setupContainerAskForRM() {
1024        // setup requirements for hosts
1025        // using * as any host will do for the distributed shell app
1026        // set the priority for the request
1027        Priority pri = Records.newRecord(Priority.class);
1028        // TODO - what is the range for priority? how to decide?
1029        pri.setPriority(requestPriority);
1030    
1031        // Set up resource type requirements
1032        // For now, memory and CPU are supported so we set memory and cpu requirements
1033        Resource capability = Records.newRecord(Resource.class);
1034        capability.setMemory(containerMemory);
1035        capability.setVirtualCores(containerVirtualCores);
1036    
1037        ContainerRequest request = new ContainerRequest(capability, null, null,
1038            pri);
1039        LOG.info("Requested container ask: " + request.toString());
1040        return request;
1041      }
1042    
1043      private boolean fileExist(String filePath) {
1044        return new File(filePath).exists();
1045      }
1046    
1047      private String readContent(String filePath) throws IOException {
1048        DataInputStream ds = null;
1049        try {
1050          ds = new DataInputStream(new FileInputStream(filePath));
1051          return ds.readUTF();
1052        } finally {
1053          org.apache.commons.io.IOUtils.closeQuietly(ds);
1054        }
1055      }
1056      
1057      private static void publishContainerStartEvent(TimelineClient timelineClient,
1058          Container container) throws IOException, YarnException {
1059        TimelineEntity entity = new TimelineEntity();
1060        entity.setEntityId(container.getId().toString());
1061        entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1062        entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
1063            .toString());
1064        TimelineEvent event = new TimelineEvent();
1065        event.setTimestamp(System.currentTimeMillis());
1066        event.setEventType(DSEvent.DS_CONTAINER_START.toString());
1067        event.addEventInfo("Node", container.getNodeId().toString());
1068        event.addEventInfo("Resources", container.getResource().toString());
1069        entity.addEvent(event);
1070    
1071        timelineClient.putEntities(entity);
1072      }
1073    
1074      private static void publishContainerEndEvent(TimelineClient timelineClient,
1075          ContainerStatus container) throws IOException, YarnException {
1076        TimelineEntity entity = new TimelineEntity();
1077        entity.setEntityId(container.getContainerId().toString());
1078        entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1079        entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
1080            .toString());
1081        TimelineEvent event = new TimelineEvent();
1082        event.setTimestamp(System.currentTimeMillis());
1083        event.setEventType(DSEvent.DS_CONTAINER_END.toString());
1084        event.addEventInfo("State", container.getState().name());
1085        event.addEventInfo("Exit Status", container.getExitStatus());
1086        entity.addEvent(event);
1087    
1088        timelineClient.putEntities(entity);
1089      }
1090    
1091      private static void publishApplicationAttemptEvent(
1092          TimelineClient timelineClient, String appAttemptId, DSEvent appEvent)
1093          throws IOException, YarnException {
1094        TimelineEntity entity = new TimelineEntity();
1095        entity.setEntityId(appAttemptId);
1096        entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
1097        entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
1098            .toString());
1099        TimelineEvent event = new TimelineEvent();
1100        event.setEventType(appEvent.toString());
1101        event.setTimestamp(System.currentTimeMillis());
1102        entity.addEvent(event);
1103    
1104        timelineClient.putEntities(entity);
1105      }
1106    }