001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.yarn.applications.distributedshell;
020    
021    import java.io.BufferedReader;
022    import java.io.DataInputStream;
023    import java.io.File;
024    import java.io.FileInputStream;
025    import java.io.IOException;
026    import java.io.StringReader;
027    import java.lang.reflect.UndeclaredThrowableException;
028    import java.net.URI;
029    import java.net.URISyntaxException;
030    import java.nio.ByteBuffer;
031    import java.security.PrivilegedExceptionAction;
032    import java.util.ArrayList;
033    import java.util.HashMap;
034    import java.util.Iterator;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.Vector;
038    import java.util.concurrent.ConcurrentHashMap;
039    import java.util.concurrent.ConcurrentMap;
040    import java.util.concurrent.atomic.AtomicInteger;
041    
042    import org.apache.commons.cli.CommandLine;
043    import org.apache.commons.cli.GnuParser;
044    import org.apache.commons.cli.HelpFormatter;
045    import org.apache.commons.cli.Options;
046    import org.apache.commons.cli.ParseException;
047    import org.apache.commons.logging.Log;
048    import org.apache.commons.logging.LogFactory;
049    import org.apache.hadoop.classification.InterfaceAudience;
050    import org.apache.hadoop.classification.InterfaceAudience.Private;
051    import org.apache.hadoop.classification.InterfaceStability;
052    import org.apache.hadoop.conf.Configuration;
053    import org.apache.hadoop.fs.FileSystem;
054    import org.apache.hadoop.fs.Path;
055    import org.apache.hadoop.io.DataOutputBuffer;
056    import org.apache.hadoop.io.IOUtils;
057    import org.apache.hadoop.net.NetUtils;
058    import org.apache.hadoop.security.Credentials;
059    import org.apache.hadoop.security.UserGroupInformation;
060    import org.apache.hadoop.security.token.Token;
061    import org.apache.hadoop.util.ExitUtil;
062    import org.apache.hadoop.util.Shell;
063    import org.apache.hadoop.yarn.api.ApplicationConstants;
064    import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
065    import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
066    import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
067    import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
068    import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
069    import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
070    import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
071    import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
072    import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
073    import org.apache.hadoop.yarn.api.records.Container;
074    import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
075    import org.apache.hadoop.yarn.api.records.ContainerId;
076    import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
077    import org.apache.hadoop.yarn.api.records.ContainerState;
078    import org.apache.hadoop.yarn.api.records.ContainerStatus;
079    import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
080    import org.apache.hadoop.yarn.api.records.LocalResource;
081    import org.apache.hadoop.yarn.api.records.LocalResourceType;
082    import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
083    import org.apache.hadoop.yarn.api.records.NodeReport;
084    import org.apache.hadoop.yarn.api.records.Priority;
085    import org.apache.hadoop.yarn.api.records.Resource;
086    import org.apache.hadoop.yarn.api.records.ResourceRequest;
087    import org.apache.hadoop.yarn.api.records.URL;
088    import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
089    import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
090    import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
091    import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
092    import org.apache.hadoop.yarn.client.api.TimelineClient;
093    import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
094    import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
095    import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
096    import org.apache.hadoop.yarn.conf.YarnConfiguration;
097    import org.apache.hadoop.yarn.exceptions.YarnException;
098    import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
099    import org.apache.hadoop.yarn.util.ConverterUtils;
100    import org.apache.log4j.LogManager;
101    
102    import com.google.common.annotations.VisibleForTesting;
103    
104    /**
105     * An ApplicationMaster for executing shell commands on a set of launched
106     * containers using the YARN framework.
107     * 
108     * <p>
109     * This class is meant to act as an example on how to write yarn-based
110     * application masters.
111     * </p>
112     * 
113     * <p>
114     * The ApplicationMaster is started on a container by the
115     * <code>ResourceManager</code>'s launcher. The first thing that the
116     * <code>ApplicationMaster</code> needs to do is to connect and register itself
117     * with the <code>ResourceManager</code>. The registration sets up information
118     * within the <code>ResourceManager</code> regarding what host:port the
119     * ApplicationMaster is listening on to provide any form of functionality to a
120     * client as well as a tracking url that a client can use to keep track of
121     * status/job history if needed. However, in the distributedshell, trackingurl
122     * and appMasterHost:appMasterRpcPort are not supported.
123     * </p>
124     * 
125     * <p>
126     * The <code>ApplicationMaster</code> needs to send a heartbeat to the
127     * <code>ResourceManager</code> at regular intervals to inform the
128     * <code>ResourceManager</code> that it is up and alive. The
129     * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
130     * <code>ApplicationMaster</code> acts as a heartbeat.
131     * 
132     * <p>
133     * For the actual handling of the job, the <code>ApplicationMaster</code> has to
134     * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
135     * required no. of containers using {@link ResourceRequest} with the necessary
136     * resource specifications such as node location, computational
137     * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
138     * responds with an {@link AllocateResponse} that informs the
139     * <code>ApplicationMaster</code> of the set of newly allocated containers,
140     * completed containers as well as current state of available resources.
141     * </p>
142     * 
143     * <p>
144     * For each allocated container, the <code>ApplicationMaster</code> can then set
145     * up the necessary launch context via {@link ContainerLaunchContext} to specify
146     * the allocated container id, local resources required by the executable, the
147     * environment to be setup for the executable, commands to execute, etc. and
148     * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
149     * launch and execute the defined commands on the given allocated container.
150     * </p>
151     * 
152     * <p>
153     * The <code>ApplicationMaster</code> can monitor the launched container by
154     * either querying the <code>ResourceManager</code> using
155     * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
156     * the {@link ContainerManagementProtocol} by querying for the status of the allocated
157     * container's {@link ContainerId}.
158     *
159     * <p>
160     * After the job has been completed, the <code>ApplicationMaster</code> has to
161     * send a {@link FinishApplicationMasterRequest} to the
162     * <code>ResourceManager</code> to inform it that the
163     * <code>ApplicationMaster</code> has been completed.
164     */
165    @InterfaceAudience.Public
166    @InterfaceStability.Unstable
167    public class ApplicationMaster {
168    
169      private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
170    
171      @VisibleForTesting
172      @Private
173      public static enum DSEvent {
174        DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
175      }
176      
177      @VisibleForTesting
178      @Private
179      public static enum DSEntity {
180        DS_APP_ATTEMPT, DS_CONTAINER
181      }
182    
183      // Configuration
184      private Configuration conf;
185    
186      // Handle to communicate with the Resource Manager
187      @SuppressWarnings("rawtypes")
188      private AMRMClientAsync amRMClient;
189    
190      // In both secure and non-secure modes, this points to the job-submitter.
191      private UserGroupInformation appSubmitterUgi;
192    
193      // Handle to communicate with the Node Manager
194      private NMClientAsync nmClientAsync;
195      // Listen to process the response from the Node Manager
196      private NMCallbackHandler containerListener;
197      
198      // Application Attempt Id ( combination of attemptId and fail count )
199      @VisibleForTesting
200      protected ApplicationAttemptId appAttemptID;
201    
202      // TODO
203      // For status update for clients - yet to be implemented
204      // Hostname of the container
205      private String appMasterHostname = "";
206      // Port on which the app master listens for status updates from clients
207      private int appMasterRpcPort = -1;
208      // Tracking url to which app master publishes info for clients to monitor
209      private String appMasterTrackingUrl = "";
210    
211      // App Master configuration
212      // No. of containers to run shell command on
213      @VisibleForTesting
214      protected int numTotalContainers = 1;
215      // Memory to request for the container on which the shell command will run
216      private int containerMemory = 10;
217      // VirtualCores to request for the container on which the shell command will run
218      private int containerVirtualCores = 1;
219      // Priority of the request
220      private int requestPriority;
221    
222      // Counter for completed containers ( complete denotes successful or failed )
223      private AtomicInteger numCompletedContainers = new AtomicInteger();
224      // Allocated container count so that we know how many containers has the RM
225      // allocated to us
226      @VisibleForTesting
227      protected AtomicInteger numAllocatedContainers = new AtomicInteger();
228      // Count of failed containers
229      private AtomicInteger numFailedContainers = new AtomicInteger();
230      // Count of containers already requested from the RM
231      // Needed as once requested, we should not request for containers again.
232      // Only request for more if the original requirement changes.
233      @VisibleForTesting
234      protected AtomicInteger numRequestedContainers = new AtomicInteger();
235    
236      // Shell command to be executed
237      private String shellCommand = "";
238      // Args to be passed to the shell command
239      private String shellArgs = "";
240      // Env variables to be setup for the shell command
241      private Map<String, String> shellEnv = new HashMap<String, String>();
242    
243      // Location of shell script ( obtained from info set in env )
244      // Shell script path in fs
245      private String scriptPath = "";
246      // Timestamp needed for creating a local resource
247      private long shellScriptPathTimestamp = 0;
248      // File length needed for local resource
249      private long shellScriptPathLen = 0;
250    
251      // Timeline domain ID
252      private String domainId = null;
253    
254      // Hardcoded path to shell script in launch container's local env
255      private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
256      private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
257          + ".bat";
258    
259      // Hardcoded path to custom log_properties
260      private static final String log4jPath = "log4j.properties";
261    
262      private static final String shellCommandPath = "shellCommands";
263      private static final String shellArgsPath = "shellArgs";
264    
265      private volatile boolean done;
266    
267      private ByteBuffer allTokens;
268    
269      // Launch threads
270      private List<Thread> launchThreads = new ArrayList<Thread>();
271    
272      // Timeline Client
273      private TimelineClient timelineClient;
274    
275      private final String linux_bash_command = "bash";
276      private final String windows_command = "cmd /c";
277    
278      /**
279       * @param args Command line args
280       */
281      public static void main(String[] args) {
282        boolean result = false;
283        try {
284          ApplicationMaster appMaster = new ApplicationMaster();
285          LOG.info("Initializing ApplicationMaster");
286          boolean doRun = appMaster.init(args);
287          if (!doRun) {
288            System.exit(0);
289          }
290          appMaster.run();
291          result = appMaster.finish();
292        } catch (Throwable t) {
293          LOG.fatal("Error running ApplicationMaster", t);
294          LogManager.shutdown();
295          ExitUtil.terminate(1, t);
296        }
297        if (result) {
298          LOG.info("Application Master completed successfully. exiting");
299          System.exit(0);
300        } else {
301          LOG.info("Application Master failed. exiting");
302          System.exit(2);
303        }
304      }
305    
306      /**
307       * Dump out contents of $CWD and the environment to stdout for debugging
308       */
309      private void dumpOutDebugInfo() {
310    
311        LOG.info("Dump debug output");
312        Map<String, String> envs = System.getenv();
313        for (Map.Entry<String, String> env : envs.entrySet()) {
314          LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
315          System.out.println("System env: key=" + env.getKey() + ", val="
316              + env.getValue());
317        }
318    
319        BufferedReader buf = null;
320        try {
321          String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
322            Shell.execCommand("ls", "-al");
323          buf = new BufferedReader(new StringReader(lines));
324          String line = "";
325          while ((line = buf.readLine()) != null) {
326            LOG.info("System CWD content: " + line);
327            System.out.println("System CWD content: " + line);
328          }
329        } catch (IOException e) {
330          e.printStackTrace();
331        } finally {
332          IOUtils.cleanup(LOG, buf);
333        }
334      }
335    
336      public ApplicationMaster() {
337        // Set up the configuration
338        conf = new YarnConfiguration();
339      }
340    
341      /**
342       * Parse command line options
343       *
344       * @param args Command line args
345       * @return Whether init successful and run should be invoked
346       * @throws ParseException
347       * @throws IOException
348       */
349      public boolean init(String[] args) throws ParseException, IOException {
350        Options opts = new Options();
351        opts.addOption("app_attempt_id", true,
352            "App Attempt ID. Not to be used unless for testing purposes");
353        opts.addOption("shell_env", true,
354            "Environment for shell script. Specified as env_key=env_val pairs");
355        opts.addOption("container_memory", true,
356            "Amount of memory in MB to be requested to run the shell command");
357        opts.addOption("container_vcores", true,
358            "Amount of virtual cores to be requested to run the shell command");
359        opts.addOption("num_containers", true,
360            "No. of containers on which the shell command needs to be executed");
361        opts.addOption("priority", true, "Application Priority. Default 0");
362        opts.addOption("debug", false, "Dump out debug information");
363    
364        opts.addOption("help", false, "Print usage");
365        CommandLine cliParser = new GnuParser().parse(opts, args);
366    
367        if (args.length == 0) {
368          printUsage(opts);
369          throw new IllegalArgumentException(
370              "No args specified for application master to initialize");
371        }
372    
373        //Check whether customer log4j.properties file exists
374        if (fileExist(log4jPath)) {
375          try {
376            Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
377                log4jPath);
378          } catch (Exception e) {
379            LOG.warn("Can not set up custom log4j properties. " + e);
380          }
381        }
382    
383        if (cliParser.hasOption("help")) {
384          printUsage(opts);
385          return false;
386        }
387    
388        if (cliParser.hasOption("debug")) {
389          dumpOutDebugInfo();
390        }
391    
392        Map<String, String> envs = System.getenv();
393    
394        if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
395          if (cliParser.hasOption("app_attempt_id")) {
396            String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
397            appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
398          } else {
399            throw new IllegalArgumentException(
400                "Application Attempt Id not set in the environment");
401          }
402        } else {
403          ContainerId containerId = ConverterUtils.toContainerId(envs
404              .get(Environment.CONTAINER_ID.name()));
405          appAttemptID = containerId.getApplicationAttemptId();
406        }
407    
408        if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
409          throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
410              + " not set in the environment");
411        }
412        if (!envs.containsKey(Environment.NM_HOST.name())) {
413          throw new RuntimeException(Environment.NM_HOST.name()
414              + " not set in the environment");
415        }
416        if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
417          throw new RuntimeException(Environment.NM_HTTP_PORT
418              + " not set in the environment");
419        }
420        if (!envs.containsKey(Environment.NM_PORT.name())) {
421          throw new RuntimeException(Environment.NM_PORT.name()
422              + " not set in the environment");
423        }
424    
425        LOG.info("Application master for app" + ", appId="
426            + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
427            + appAttemptID.getApplicationId().getClusterTimestamp()
428            + ", attemptId=" + appAttemptID.getAttemptId());
429    
430        if (!fileExist(shellCommandPath)
431            && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
432          throw new IllegalArgumentException(
433              "No shell command or shell script specified to be executed by application master");
434        }
435    
436        if (fileExist(shellCommandPath)) {
437          shellCommand = readContent(shellCommandPath);
438        }
439    
440        if (fileExist(shellArgsPath)) {
441          shellArgs = readContent(shellArgsPath);
442        }
443    
444        if (cliParser.hasOption("shell_env")) {
445          String shellEnvs[] = cliParser.getOptionValues("shell_env");
446          for (String env : shellEnvs) {
447            env = env.trim();
448            int index = env.indexOf('=');
449            if (index == -1) {
450              shellEnv.put(env, "");
451              continue;
452            }
453            String key = env.substring(0, index);
454            String val = "";
455            if (index < (env.length() - 1)) {
456              val = env.substring(index + 1);
457            }
458            shellEnv.put(key, val);
459          }
460        }
461    
462        if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
463          scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
464    
465          if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
466            shellScriptPathTimestamp = Long.valueOf(envs
467                .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
468          }
469          if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
470            shellScriptPathLen = Long.valueOf(envs
471                .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
472          }
473          if (!scriptPath.isEmpty()
474              && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
475            LOG.error("Illegal values in env for shell script path" + ", path="
476                + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
477                + shellScriptPathTimestamp);
478            throw new IllegalArgumentException(
479                "Illegal values in env for shell script path");
480          }
481        }
482    
483        if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
484          domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
485        }
486    
487        containerMemory = Integer.parseInt(cliParser.getOptionValue(
488            "container_memory", "10"));
489        containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
490            "container_vcores", "1"));
491        numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
492            "num_containers", "1"));
493        if (numTotalContainers == 0) {
494          throw new IllegalArgumentException(
495              "Cannot run distributed shell with no containers");
496        }
497        requestPriority = Integer.parseInt(cliParser
498            .getOptionValue("priority", "0"));
499    
500        // Creating the Timeline Client
501        timelineClient = TimelineClient.createTimelineClient();
502        timelineClient.init(conf);
503        timelineClient.start();
504    
505        return true;
506      }
507    
508      /**
509       * Helper function to print usage
510       *
511       * @param opts Parsed command line options
512       */
513      private void printUsage(Options opts) {
514        new HelpFormatter().printHelp("ApplicationMaster", opts);
515      }
516    
517      /**
518       * Main run function for the application master
519       *
520       * @throws YarnException
521       * @throws IOException
522       */
523      @SuppressWarnings({ "unchecked" })
524      public void run() throws YarnException, IOException {
525        LOG.info("Starting ApplicationMaster");
526    
527        // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
528        // are marked as LimitedPrivate
529        Credentials credentials =
530            UserGroupInformation.getCurrentUser().getCredentials();
531        DataOutputBuffer dob = new DataOutputBuffer();
532        credentials.writeTokenStorageToStream(dob);
533        // Now remove the AM->RM token so that containers cannot access it.
534        Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
535        LOG.info("Executing with tokens:");
536        while (iter.hasNext()) {
537          Token<?> token = iter.next();
538          LOG.info(token);
539          if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
540            iter.remove();
541          }
542        }
543        allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
544    
545        // Create appSubmitterUgi and add original tokens to it
546        String appSubmitterUserName =
547            System.getenv(ApplicationConstants.Environment.USER.name());
548        appSubmitterUgi =
549            UserGroupInformation.createRemoteUser(appSubmitterUserName);
550        appSubmitterUgi.addCredentials(credentials);
551    
552        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
553            DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
554    
555        AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
556        amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
557        amRMClient.init(conf);
558        amRMClient.start();
559    
560        containerListener = createNMCallbackHandler();
561        nmClientAsync = new NMClientAsyncImpl(containerListener);
562        nmClientAsync.init(conf);
563        nmClientAsync.start();
564    
565        // Setup local RPC Server to accept status requests directly from clients
566        // TODO need to setup a protocol for client to be able to communicate to
567        // the RPC server
568        // TODO use the rpc port info to register with the RM for the client to
569        // send requests to this app master
570    
571        // Register self with ResourceManager
572        // This will start heartbeating to the RM
573        appMasterHostname = NetUtils.getHostname();
574        RegisterApplicationMasterResponse response = amRMClient
575            .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
576                appMasterTrackingUrl);
577        // Dump out information about cluster capability as seen by the
578        // resource manager
579        int maxMem = response.getMaximumResourceCapability().getMemory();
580        LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
581        
582        int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
583        LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
584    
585        // A resource ask cannot exceed the max.
586        if (containerMemory > maxMem) {
587          LOG.info("Container memory specified above max threshold of cluster."
588              + " Using max value." + ", specified=" + containerMemory + ", max="
589              + maxMem);
590          containerMemory = maxMem;
591        }
592    
593        if (containerVirtualCores > maxVCores) {
594          LOG.info("Container virtual cores specified above max threshold of cluster."
595              + " Using max value." + ", specified=" + containerVirtualCores + ", max="
596              + maxVCores);
597          containerVirtualCores = maxVCores;
598        }
599    
600        List<Container> previousAMRunningContainers =
601            response.getContainersFromPreviousAttempts();
602        LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
603          + " previous attempts' running containers on AM registration.");
604        numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
605    
606        int numTotalContainersToRequest =
607            numTotalContainers - previousAMRunningContainers.size();
608        // Setup ask for containers from RM
609        // Send request for containers to RM
610        // Until we get our fully allocated quota, we keep on polling RM for
611        // containers
612        // Keep looping until all the containers are launched and shell script
613        // executed on them ( regardless of success/failure).
614        for (int i = 0; i < numTotalContainersToRequest; ++i) {
615          ContainerRequest containerAsk = setupContainerAskForRM();
616          amRMClient.addContainerRequest(containerAsk);
617        }
618        numRequestedContainers.set(numTotalContainers);
619    
620        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
621            DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
622      }
623    
624      @VisibleForTesting
625      NMCallbackHandler createNMCallbackHandler() {
626        return new NMCallbackHandler(this);
627      }
628    
629      @VisibleForTesting
630      protected boolean finish() {
631        // wait for completion.
632        while (!done
633            && (numCompletedContainers.get() != numTotalContainers)) {
634          try {
635            Thread.sleep(200);
636          } catch (InterruptedException ex) {}
637        }
638    
639        // Join all launched threads
640        // needed for when we time out
641        // and we need to release containers
642        for (Thread launchThread : launchThreads) {
643          try {
644            launchThread.join(10000);
645          } catch (InterruptedException e) {
646            LOG.info("Exception thrown in thread join: " + e.getMessage());
647            e.printStackTrace();
648          }
649        }
650    
651        // When the application completes, it should stop all running containers
652        LOG.info("Application completed. Stopping running containers");
653        nmClientAsync.stop();
654    
655        // When the application completes, it should send a finish application
656        // signal to the RM
657        LOG.info("Application completed. Signalling finish to RM");
658    
659        FinalApplicationStatus appStatus;
660        String appMessage = null;
661        boolean success = true;
662        if (numFailedContainers.get() == 0 && 
663            numCompletedContainers.get() == numTotalContainers) {
664          appStatus = FinalApplicationStatus.SUCCEEDED;
665        } else {
666          appStatus = FinalApplicationStatus.FAILED;
667          appMessage = "Diagnostics." + ", total=" + numTotalContainers
668              + ", completed=" + numCompletedContainers.get() + ", allocated="
669              + numAllocatedContainers.get() + ", failed="
670              + numFailedContainers.get();
671          LOG.info(appMessage);
672          success = false;
673        }
674        try {
675          amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
676        } catch (YarnException ex) {
677          LOG.error("Failed to unregister application", ex);
678        } catch (IOException e) {
679          LOG.error("Failed to unregister application", e);
680        }
681        
682        amRMClient.stop();
683    
684        return success;
685      }
686      
687      private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
688        @SuppressWarnings("unchecked")
689        @Override
690        public void onContainersCompleted(List<ContainerStatus> completedContainers) {
691          LOG.info("Got response from RM for container ask, completedCnt="
692              + completedContainers.size());
693          for (ContainerStatus containerStatus : completedContainers) {
694            LOG.info(appAttemptID + " got container status for containerID="
695                + containerStatus.getContainerId() + ", state="
696                + containerStatus.getState() + ", exitStatus="
697                + containerStatus.getExitStatus() + ", diagnostics="
698                + containerStatus.getDiagnostics());
699    
700            // non complete containers should not be here
701            assert (containerStatus.getState() == ContainerState.COMPLETE);
702    
703            // increment counters for completed/failed containers
704            int exitStatus = containerStatus.getExitStatus();
705            if (0 != exitStatus) {
706              // container failed
707              if (ContainerExitStatus.ABORTED != exitStatus) {
708                // shell script failed
709                // counts as completed
710                numCompletedContainers.incrementAndGet();
711                numFailedContainers.incrementAndGet();
712              } else {
713                // container was killed by framework, possibly preempted
714                // we should re-try as the container was lost for some reason
715                numAllocatedContainers.decrementAndGet();
716                numRequestedContainers.decrementAndGet();
717                // we do not need to release the container as it would be done
718                // by the RM
719              }
720            } else {
721              // nothing to do
722              // container completed successfully
723              numCompletedContainers.incrementAndGet();
724              LOG.info("Container completed successfully." + ", containerId="
725                  + containerStatus.getContainerId());
726            }
727            publishContainerEndEvent(
728                timelineClient, containerStatus, domainId, appSubmitterUgi);
729          }
730          
731          // ask for more containers if any failed
732          int askCount = numTotalContainers - numRequestedContainers.get();
733          numRequestedContainers.addAndGet(askCount);
734    
735          if (askCount > 0) {
736            for (int i = 0; i < askCount; ++i) {
737              ContainerRequest containerAsk = setupContainerAskForRM();
738              amRMClient.addContainerRequest(containerAsk);
739            }
740          }
741          
742          if (numCompletedContainers.get() == numTotalContainers) {
743            done = true;
744          }
745        }
746    
747        @Override
748        public void onContainersAllocated(List<Container> allocatedContainers) {
749          LOG.info("Got response from RM for container ask, allocatedCnt="
750              + allocatedContainers.size());
751          numAllocatedContainers.addAndGet(allocatedContainers.size());
752          for (Container allocatedContainer : allocatedContainers) {
753            LOG.info("Launching shell command on a new container."
754                + ", containerId=" + allocatedContainer.getId()
755                + ", containerNode=" + allocatedContainer.getNodeId().getHost()
756                + ":" + allocatedContainer.getNodeId().getPort()
757                + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
758                + ", containerResourceMemory"
759                + allocatedContainer.getResource().getMemory()
760                + ", containerResourceVirtualCores"
761                + allocatedContainer.getResource().getVirtualCores());
762            // + ", containerToken"
763            // +allocatedContainer.getContainerToken().getIdentifier().toString());
764    
765            LaunchContainerRunnable runnableLaunchContainer =
766                new LaunchContainerRunnable(allocatedContainer, containerListener);
767            Thread launchThread = new Thread(runnableLaunchContainer);
768    
769            // launch and start the container on a separate thread to keep
770            // the main thread unblocked
771            // as all containers may not be allocated at one go.
772            launchThreads.add(launchThread);
773            launchThread.start();
774          }
775        }
776    
777        @Override
778        public void onShutdownRequest() {
779          done = true;
780        }
781    
782        @Override
783        public void onNodesUpdated(List<NodeReport> updatedNodes) {}
784    
785        @Override
786        public float getProgress() {
787          // set progress to deliver to RM on next heartbeat
788          float progress = (float) numCompletedContainers.get()
789              / numTotalContainers;
790          return progress;
791        }
792    
793        @Override
794        public void onError(Throwable e) {
795          done = true;
796          amRMClient.stop();
797        }
798      }
799    
800      @VisibleForTesting
801      static class NMCallbackHandler
802        implements NMClientAsync.CallbackHandler {
803    
804        private ConcurrentMap<ContainerId, Container> containers =
805            new ConcurrentHashMap<ContainerId, Container>();
806        private final ApplicationMaster applicationMaster;
807    
808        public NMCallbackHandler(ApplicationMaster applicationMaster) {
809          this.applicationMaster = applicationMaster;
810        }
811    
812        public void addContainer(ContainerId containerId, Container container) {
813          containers.putIfAbsent(containerId, container);
814        }
815    
816        @Override
817        public void onContainerStopped(ContainerId containerId) {
818          if (LOG.isDebugEnabled()) {
819            LOG.debug("Succeeded to stop Container " + containerId);
820          }
821          containers.remove(containerId);
822        }
823    
824        @Override
825        public void onContainerStatusReceived(ContainerId containerId,
826            ContainerStatus containerStatus) {
827          if (LOG.isDebugEnabled()) {
828            LOG.debug("Container Status: id=" + containerId + ", status=" +
829                containerStatus);
830          }
831        }
832    
833        @Override
834        public void onContainerStarted(ContainerId containerId,
835            Map<String, ByteBuffer> allServiceResponse) {
836          if (LOG.isDebugEnabled()) {
837            LOG.debug("Succeeded to start Container " + containerId);
838          }
839          Container container = containers.get(containerId);
840          if (container != null) {
841            applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
842          }
843          ApplicationMaster.publishContainerStartEvent(
844              applicationMaster.timelineClient, container,
845              applicationMaster.domainId, applicationMaster.appSubmitterUgi);
846        }
847    
848        @Override
849        public void onStartContainerError(ContainerId containerId, Throwable t) {
850          LOG.error("Failed to start Container " + containerId);
851          containers.remove(containerId);
852          applicationMaster.numCompletedContainers.incrementAndGet();
853          applicationMaster.numFailedContainers.incrementAndGet();
854        }
855    
856        @Override
857        public void onGetContainerStatusError(
858            ContainerId containerId, Throwable t) {
859          LOG.error("Failed to query the status of Container " + containerId);
860        }
861    
862        @Override
863        public void onStopContainerError(ContainerId containerId, Throwable t) {
864          LOG.error("Failed to stop Container " + containerId);
865          containers.remove(containerId);
866        }
867      }
868    
869      /**
870       * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
871       * that will execute the shell command.
872       */
873      private class LaunchContainerRunnable implements Runnable {
874    
875        // Allocated container
876        Container container;
877    
878        NMCallbackHandler containerListener;
879    
880        /**
881         * @param lcontainer Allocated container
882         * @param containerListener Callback handler of the container
883         */
884        public LaunchContainerRunnable(
885            Container lcontainer, NMCallbackHandler containerListener) {
886          this.container = lcontainer;
887          this.containerListener = containerListener;
888        }
889    
890        @Override
891        /**
892         * Connects to CM, sets up container launch context 
893         * for shell command and eventually dispatches the container 
894         * start request to the CM. 
895         */
896        public void run() {
897          LOG.info("Setting up container launch container for containerid="
898              + container.getId());
899    
900          // Set the local resources
901          Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
902    
903          // The container for the eventual shell commands needs its own local
904          // resources too.
905          // In this scenario, if a shell script is specified, we need to have it
906          // copied and made available to the container.
907          if (!scriptPath.isEmpty()) {
908            Path renamedScriptPath = null;
909            if (Shell.WINDOWS) {
910              renamedScriptPath = new Path(scriptPath + ".bat");
911            } else {
912              renamedScriptPath = new Path(scriptPath + ".sh");
913            }
914    
915            try {
916              // rename the script file based on the underlying OS syntax.
917              renameScriptFile(renamedScriptPath);
918            } catch (Exception e) {
919              LOG.error(
920                  "Not able to add suffix (.bat/.sh) to the shell script filename",
921                  e);
922              // We know we cannot continue launching the container
923              // so we should release it.
924              numCompletedContainers.incrementAndGet();
925              numFailedContainers.incrementAndGet();
926              return;
927            }
928    
929            URL yarnUrl = null;
930            try {
931              yarnUrl = ConverterUtils.getYarnUrlFromURI(
932                new URI(renamedScriptPath.toString()));
933            } catch (URISyntaxException e) {
934              LOG.error("Error when trying to use shell script path specified"
935                  + " in env, path=" + renamedScriptPath, e);
936              // A failure scenario on bad input such as invalid shell script path
937              // We know we cannot continue launching the container
938              // so we should release it.
939              // TODO
940              numCompletedContainers.incrementAndGet();
941              numFailedContainers.incrementAndGet();
942              return;
943            }
944            LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
945              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
946              shellScriptPathLen, shellScriptPathTimestamp);
947            localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
948                ExecShellStringPath, shellRsrc);
949            shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
950          }
951    
952          // Set the necessary command to execute on the allocated container
953          Vector<CharSequence> vargs = new Vector<CharSequence>(5);
954    
955          // Set executable command
956          vargs.add(shellCommand);
957          // Set shell script path
958          if (!scriptPath.isEmpty()) {
959            vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
960                : ExecShellStringPath);
961          }
962    
963          // Set args for the shell command if any
964          vargs.add(shellArgs);
965          // Add log redirect params
966          vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
967          vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
968    
969          // Get final commmand
970          StringBuilder command = new StringBuilder();
971          for (CharSequence str : vargs) {
972            command.append(str).append(" ");
973          }
974    
975          List<String> commands = new ArrayList<String>();
976          commands.add(command.toString());
977    
978          // Set up ContainerLaunchContext, setting local resource, environment,
979          // command and token for constructor.
980    
981          // Note for tokens: Set up tokens for the container too. Today, for normal
982          // shell commands, the container in distribute-shell doesn't need any
983          // tokens. We are populating them mainly for NodeManagers to be able to
984          // download anyfiles in the distributed file-system. The tokens are
985          // otherwise also useful in cases, for e.g., when one is running a
986          // "hadoop dfs" command inside the distributed shell.
987          ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
988            localResources, shellEnv, commands, null, allTokens.duplicate(), null);
989          containerListener.addContainer(container.getId(), container);
990          nmClientAsync.startContainerAsync(container, ctx);
991        }
992      }
993    
994      private void renameScriptFile(final Path renamedScriptPath)
995          throws IOException, InterruptedException {
996        appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
997          @Override
998          public Void run() throws IOException {
999            FileSystem fs = renamedScriptPath.getFileSystem(conf);
1000            fs.rename(new Path(scriptPath), renamedScriptPath);
1001            return null;
1002          }
1003        });
1004        LOG.info("User " + appSubmitterUgi.getUserName()
1005            + " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
1006      }
1007    
1008      /**
1009       * Setup the request that will be sent to the RM for the container ask.
1010       *
1011       * @return the setup ResourceRequest to be sent to RM
1012       */
1013      private ContainerRequest setupContainerAskForRM() {
1014        // setup requirements for hosts
1015        // using * as any host will do for the distributed shell app
1016        // set the priority for the request
1017        // TODO - what is the range for priority? how to decide?
1018        Priority pri = Priority.newInstance(requestPriority);
1019    
1020        // Set up resource type requirements
1021        // For now, memory and CPU are supported so we set memory and cpu requirements
1022        Resource capability = Resource.newInstance(containerMemory,
1023          containerVirtualCores);
1024    
1025        ContainerRequest request = new ContainerRequest(capability, null, null,
1026            pri);
1027        LOG.info("Requested container ask: " + request.toString());
1028        return request;
1029      }
1030    
1031      private boolean fileExist(String filePath) {
1032        return new File(filePath).exists();
1033      }
1034    
1035      private String readContent(String filePath) throws IOException {
1036        DataInputStream ds = null;
1037        try {
1038          ds = new DataInputStream(new FileInputStream(filePath));
1039          return ds.readUTF();
1040        } finally {
1041          org.apache.commons.io.IOUtils.closeQuietly(ds);
1042        }
1043      }
1044      
1045      private static void publishContainerStartEvent(
1046          final TimelineClient timelineClient, Container container, String domainId,
1047          UserGroupInformation ugi) {
1048        final TimelineEntity entity = new TimelineEntity();
1049        entity.setEntityId(container.getId().toString());
1050        entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1051        entity.setDomainId(domainId);
1052        entity.addPrimaryFilter("user", ugi.getShortUserName());
1053        TimelineEvent event = new TimelineEvent();
1054        event.setTimestamp(System.currentTimeMillis());
1055        event.setEventType(DSEvent.DS_CONTAINER_START.toString());
1056        event.addEventInfo("Node", container.getNodeId().toString());
1057        event.addEventInfo("Resources", container.getResource().toString());
1058        entity.addEvent(event);
1059    
1060        try {
1061          ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
1062            @Override
1063            public TimelinePutResponse run() throws Exception {
1064              return timelineClient.putEntities(entity);
1065            }
1066          });
1067        } catch (Exception e) {
1068          LOG.error("Container start event could not be published for "
1069              + container.getId().toString(),
1070              e instanceof UndeclaredThrowableException ? e.getCause() : e);
1071        }
1072      }
1073    
1074      private static void publishContainerEndEvent(
1075          final TimelineClient timelineClient, ContainerStatus container,
1076          String domainId, UserGroupInformation ugi) {
1077        final TimelineEntity entity = new TimelineEntity();
1078        entity.setEntityId(container.getContainerId().toString());
1079        entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1080        entity.setDomainId(domainId);
1081        entity.addPrimaryFilter("user", ugi.getShortUserName());
1082        TimelineEvent event = new TimelineEvent();
1083        event.setTimestamp(System.currentTimeMillis());
1084        event.setEventType(DSEvent.DS_CONTAINER_END.toString());
1085        event.addEventInfo("State", container.getState().name());
1086        event.addEventInfo("Exit Status", container.getExitStatus());
1087        entity.addEvent(event);
1088    
1089        try {
1090          ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
1091            @Override
1092            public TimelinePutResponse run() throws Exception {
1093              return timelineClient.putEntities(entity);
1094            }
1095          });
1096        } catch (Exception e) {
1097          LOG.error("Container end event could not be published for "
1098              + container.getContainerId().toString(),
1099              e instanceof UndeclaredThrowableException ? e.getCause() : e);
1100        }
1101      }
1102    
1103      private static void publishApplicationAttemptEvent(
1104          final TimelineClient timelineClient, String appAttemptId,
1105          DSEvent appEvent, String domainId, UserGroupInformation ugi) {
1106        final TimelineEntity entity = new TimelineEntity();
1107        entity.setEntityId(appAttemptId);
1108        entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
1109        entity.setDomainId(domainId);
1110        entity.addPrimaryFilter("user", ugi.getShortUserName());
1111        TimelineEvent event = new TimelineEvent();
1112        event.setEventType(appEvent.toString());
1113        event.setTimestamp(System.currentTimeMillis());
1114        entity.addEvent(event);
1115    
1116        try {
1117          ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
1118            @Override
1119            public TimelinePutResponse run() throws Exception {
1120              return timelineClient.putEntities(entity);
1121            }
1122          });
1123        } catch (Exception e) {
1124          LOG.error("App Attempt "
1125              + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
1126              + " event could not be published for "
1127              + appAttemptId.toString(),
1128              e instanceof UndeclaredThrowableException ? e.getCause() : e);
1129        }
1130      }
1131    }