001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.yarn.applications.distributedshell;
020    
021    import java.io.BufferedReader;
022    import java.io.DataInputStream;
023    import java.io.File;
024    import java.io.FileInputStream;
025    import java.io.IOException;
026    import java.io.StringReader;
027    import java.net.URI;
028    import java.net.URISyntaxException;
029    import java.nio.ByteBuffer;
030    import java.util.ArrayList;
031    import java.util.HashMap;
032    import java.util.Iterator;
033    import java.util.List;
034    import java.util.Map;
035    import java.util.Vector;
036    import java.util.concurrent.ConcurrentHashMap;
037    import java.util.concurrent.ConcurrentMap;
038    import java.util.concurrent.atomic.AtomicInteger;
039    
040    import com.google.common.annotations.VisibleForTesting;
041    import org.apache.commons.cli.CommandLine;
042    import org.apache.commons.cli.GnuParser;
043    import org.apache.commons.cli.HelpFormatter;
044    import org.apache.commons.cli.Options;
045    import org.apache.commons.cli.ParseException;
046    import org.apache.commons.logging.Log;
047    import org.apache.commons.logging.LogFactory;
048    import org.apache.hadoop.classification.InterfaceAudience;
049    import org.apache.hadoop.classification.InterfaceStability;
050    import org.apache.hadoop.conf.Configuration;
051    import org.apache.hadoop.io.DataOutputBuffer;
052    import org.apache.hadoop.io.IOUtils;
053    import org.apache.hadoop.net.NetUtils;
054    import org.apache.hadoop.security.Credentials;
055    import org.apache.hadoop.security.UserGroupInformation;
056    import org.apache.hadoop.security.token.Token;
057    import org.apache.hadoop.util.Shell;
058    import org.apache.hadoop.yarn.api.ApplicationConstants;
059    import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
060    import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
061    import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
062    import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
063    import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
064    import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
065    import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
066    import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
067    import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
068    import org.apache.hadoop.yarn.api.records.Container;
069    import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
070    import org.apache.hadoop.yarn.api.records.ContainerId;
071    import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
072    import org.apache.hadoop.yarn.api.records.ContainerState;
073    import org.apache.hadoop.yarn.api.records.ContainerStatus;
074    import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
075    import org.apache.hadoop.yarn.api.records.LocalResource;
076    import org.apache.hadoop.yarn.api.records.LocalResourceType;
077    import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
078    import org.apache.hadoop.yarn.api.records.NodeReport;
079    import org.apache.hadoop.yarn.api.records.Priority;
080    import org.apache.hadoop.yarn.api.records.Resource;
081    import org.apache.hadoop.yarn.api.records.ResourceRequest;
082    import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
083    import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
084    import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
085    import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
086    import org.apache.hadoop.yarn.conf.YarnConfiguration;
087    import org.apache.hadoop.yarn.exceptions.YarnException;
088    import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
089    import org.apache.hadoop.yarn.util.ConverterUtils;
090    import org.apache.hadoop.yarn.util.Records;
091    
092    /**
093     * An ApplicationMaster for executing shell commands on a set of launched
094     * containers using the YARN framework.
095     * 
096     * <p>
097     * This class is meant to act as an example on how to write yarn-based
098     * application masters.
099     * </p>
100     * 
101     * <p>
102     * The ApplicationMaster is started on a container by the
103     * <code>ResourceManager</code>'s launcher. The first thing that the
104     * <code>ApplicationMaster</code> needs to do is to connect and register itself
105     * with the <code>ResourceManager</code>. The registration sets up information
106     * within the <code>ResourceManager</code> regarding what host:port the
107     * ApplicationMaster is listening on to provide any form of functionality to a
108     * client as well as a tracking url that a client can use to keep track of
109     * status/job history if needed. However, in the distributedshell, trackingurl
110     * and appMasterHost:appMasterRpcPort are not supported.
111     * </p>
112     * 
113     * <p>
114     * The <code>ApplicationMaster</code> needs to send a heartbeat to the
115     * <code>ResourceManager</code> at regular intervals to inform the
116     * <code>ResourceManager</code> that it is up and alive. The
117     * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
118     * <code>ApplicationMaster</code> acts as a heartbeat.
119     * 
120     * <p>
121     * For the actual handling of the job, the <code>ApplicationMaster</code> has to
122     * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
123     * required no. of containers using {@link ResourceRequest} with the necessary
124     * resource specifications such as node location, computational
125     * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
126     * responds with an {@link AllocateResponse} that informs the
127     * <code>ApplicationMaster</code> of the set of newly allocated containers,
128     * completed containers as well as current state of available resources.
129     * </p>
130     * 
131     * <p>
132     * For each allocated container, the <code>ApplicationMaster</code> can then set
133     * up the necessary launch context via {@link ContainerLaunchContext} to specify
134     * the allocated container id, local resources required by the executable, the
135     * environment to be setup for the executable, commands to execute, etc. and
136     * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
137     * launch and execute the defined commands on the given allocated container.
138     * </p>
139     * 
140     * <p>
141     * The <code>ApplicationMaster</code> can monitor the launched container by
142     * either querying the <code>ResourceManager</code> using
143     * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
144     * the {@link ContainerManagementProtocol} by querying for the status of the allocated
145     * container's {@link ContainerId}.
146     *
147     * <p>
148     * After the job has been completed, the <code>ApplicationMaster</code> has to
149     * send a {@link FinishApplicationMasterRequest} to the
150     * <code>ResourceManager</code> to inform it that the
151     * <code>ApplicationMaster</code> has been completed.
152     */
153    @InterfaceAudience.Public
154    @InterfaceStability.Unstable
155    public class ApplicationMaster {
156    
157      private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
158    
159      // Configuration
160      private Configuration conf;
161    
162      // Handle to communicate with the Resource Manager
163      @SuppressWarnings("rawtypes")
164      private AMRMClientAsync amRMClient;
165    
166      // Handle to communicate with the Node Manager
167      private NMClientAsync nmClientAsync;
168      // Listen to process the response from the Node Manager
169      private NMCallbackHandler containerListener;
170      
171      // Application Attempt Id ( combination of attemptId and fail count )
172      private ApplicationAttemptId appAttemptID;
173    
174      // TODO
175      // For status update for clients - yet to be implemented
176      // Hostname of the container
177      private String appMasterHostname = "";
178      // Port on which the app master listens for status updates from clients
179      private int appMasterRpcPort = -1;
180      // Tracking url to which app master publishes info for clients to monitor
181      private String appMasterTrackingUrl = "";
182    
183      // App Master configuration
184      // No. of containers to run shell command on
185      private int numTotalContainers = 1;
186      // Memory to request for the container on which the shell command will run
187      private int containerMemory = 10;
188      // VirtualCores to request for the container on which the shell command will run
189      private int containerVirtualCores = 1;
190      // Priority of the request
191      private int requestPriority;
192    
193      // Counter for completed containers ( complete denotes successful or failed )
194      private AtomicInteger numCompletedContainers = new AtomicInteger();
195      // Allocated container count so that we know how many containers has the RM
196      // allocated to us
197      private AtomicInteger numAllocatedContainers = new AtomicInteger();
198      // Count of failed containers
199      private AtomicInteger numFailedContainers = new AtomicInteger();
200      // Count of containers already requested from the RM
201      // Needed as once requested, we should not request for containers again.
202      // Only request for more if the original requirement changes.
203      private AtomicInteger numRequestedContainers = new AtomicInteger();
204    
205      // Shell command to be executed
206      private String shellCommand = "";
207      // Args to be passed to the shell command
208      private String shellArgs = "";
209      // Env variables to be setup for the shell command
210      private Map<String, String> shellEnv = new HashMap<String, String>();
211    
212      // Location of shell script ( obtained from info set in env )
213      // Shell script path in fs
214      private String shellScriptPath = "";
215      // Timestamp needed for creating a local resource
216      private long shellScriptPathTimestamp = 0;
217      // File length needed for local resource
218      private long shellScriptPathLen = 0;
219    
220      // Hardcoded path to shell script in launch container's local env
221      private static final String ExecShellStringPath = "ExecShellScript.sh";
222      private static final String ExecBatScripStringtPath = "ExecBatScript.bat";
223    
224      // Hardcoded path to custom log_properties
225      private static final String log4jPath = "log4j.properties";
226    
227      private static final String shellCommandPath = "shellCommands";
228      private static final String shellArgsPath = "shellArgs";
229    
230      private volatile boolean done;
231      private volatile boolean success;
232    
233      private ByteBuffer allTokens;
234    
235      // Launch threads
236      private List<Thread> launchThreads = new ArrayList<Thread>();
237    
238      private final String linux_bash_command = "bash";
239      private final String windows_command = "cmd /c";
240    
241      /**
242       * @param args Command line args
243       */
244      public static void main(String[] args) {
245        boolean result = false;
246        try {
247          ApplicationMaster appMaster = new ApplicationMaster();
248          LOG.info("Initializing ApplicationMaster");
249          boolean doRun = appMaster.init(args);
250          if (!doRun) {
251            System.exit(0);
252          }
253          result = appMaster.run();
254        } catch (Throwable t) {
255          LOG.fatal("Error running ApplicationMaster", t);
256          System.exit(1);
257        }
258        if (result) {
259          LOG.info("Application Master completed successfully. exiting");
260          System.exit(0);
261        } else {
262          LOG.info("Application Master failed. exiting");
263          System.exit(2);
264        }
265      }
266    
267      /**
268       * Dump out contents of $CWD and the environment to stdout for debugging
269       */
270      private void dumpOutDebugInfo() {
271    
272        LOG.info("Dump debug output");
273        Map<String, String> envs = System.getenv();
274        for (Map.Entry<String, String> env : envs.entrySet()) {
275          LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
276          System.out.println("System env: key=" + env.getKey() + ", val="
277              + env.getValue());
278        }
279    
280        BufferedReader buf = null;
281        try {
282          String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
283            Shell.execCommand("ls", "-al");
284          buf = new BufferedReader(new StringReader(lines));
285          String line = "";
286          while ((line = buf.readLine()) != null) {
287            LOG.info("System CWD content: " + line);
288            System.out.println("System CWD content: " + line);
289          }
290        } catch (IOException e) {
291          e.printStackTrace();
292        } finally {
293          IOUtils.cleanup(LOG, buf);
294        }
295      }
296    
297      public ApplicationMaster() {
298        // Set up the configuration
299        conf = new YarnConfiguration();
300      }
301    
302      /**
303       * Parse command line options
304       *
305       * @param args Command line args
306       * @return Whether init successful and run should be invoked
307       * @throws ParseException
308       * @throws IOException
309       */
310      public boolean init(String[] args) throws ParseException, IOException {
311    
312        Options opts = new Options();
313        opts.addOption("app_attempt_id", true,
314            "App Attempt ID. Not to be used unless for testing purposes");
315        opts.addOption("shell_env", true,
316            "Environment for shell script. Specified as env_key=env_val pairs");
317        opts.addOption("container_memory", true,
318            "Amount of memory in MB to be requested to run the shell command");
319        opts.addOption("container_vcores", true,
320            "Amount of virtual cores to be requested to run the shell command");
321        opts.addOption("num_containers", true,
322            "No. of containers on which the shell command needs to be executed");
323        opts.addOption("priority", true, "Application Priority. Default 0");
324        opts.addOption("debug", false, "Dump out debug information");
325    
326        opts.addOption("help", false, "Print usage");
327        CommandLine cliParser = new GnuParser().parse(opts, args);
328    
329        if (args.length == 0) {
330          printUsage(opts);
331          throw new IllegalArgumentException(
332              "No args specified for application master to initialize");
333        }
334    
335        //Check whether customer log4j.properties file exists
336        if (fileExist(log4jPath)) {
337          try {
338            Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
339                log4jPath);
340          } catch (Exception e) {
341            LOG.warn("Can not set up custom log4j properties. " + e);
342          }
343        }
344    
345        if (cliParser.hasOption("help")) {
346          printUsage(opts);
347          return false;
348        }
349    
350        if (cliParser.hasOption("debug")) {
351          dumpOutDebugInfo();
352        }
353    
354        Map<String, String> envs = System.getenv();
355    
356        if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
357          if (cliParser.hasOption("app_attempt_id")) {
358            String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
359            appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
360          } else {
361            throw new IllegalArgumentException(
362                "Application Attempt Id not set in the environment");
363          }
364        } else {
365          ContainerId containerId = ConverterUtils.toContainerId(envs
366              .get(Environment.CONTAINER_ID.name()));
367          appAttemptID = containerId.getApplicationAttemptId();
368        }
369    
370        if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
371          throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
372              + " not set in the environment");
373        }
374        if (!envs.containsKey(Environment.NM_HOST.name())) {
375          throw new RuntimeException(Environment.NM_HOST.name()
376              + " not set in the environment");
377        }
378        if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
379          throw new RuntimeException(Environment.NM_HTTP_PORT
380              + " not set in the environment");
381        }
382        if (!envs.containsKey(Environment.NM_PORT.name())) {
383          throw new RuntimeException(Environment.NM_PORT.name()
384              + " not set in the environment");
385        }
386    
387        LOG.info("Application master for app" + ", appId="
388            + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
389            + appAttemptID.getApplicationId().getClusterTimestamp()
390            + ", attemptId=" + appAttemptID.getAttemptId());
391    
392        if (!fileExist(shellCommandPath)
393            && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
394          throw new IllegalArgumentException(
395              "No shell command or shell script specified to be executed by application master");
396        }
397    
398        if (fileExist(shellCommandPath)) {
399          shellCommand = readContent(shellCommandPath);
400        }
401    
402        if (fileExist(shellArgsPath)) {
403          shellArgs = readContent(shellArgsPath);
404        }
405    
406        if (cliParser.hasOption("shell_env")) {
407          String shellEnvs[] = cliParser.getOptionValues("shell_env");
408          for (String env : shellEnvs) {
409            env = env.trim();
410            int index = env.indexOf('=');
411            if (index == -1) {
412              shellEnv.put(env, "");
413              continue;
414            }
415            String key = env.substring(0, index);
416            String val = "";
417            if (index < (env.length() - 1)) {
418              val = env.substring(index + 1);
419            }
420            shellEnv.put(key, val);
421          }
422        }
423    
424        if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
425          shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
426    
427          if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
428            shellScriptPathTimestamp = Long.valueOf(envs
429                .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
430          }
431          if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
432            shellScriptPathLen = Long.valueOf(envs
433                .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
434          }
435    
436          if (!shellScriptPath.isEmpty()
437              && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
438            LOG.error("Illegal values in env for shell script path" + ", path="
439                + shellScriptPath + ", len=" + shellScriptPathLen + ", timestamp="
440                + shellScriptPathTimestamp);
441            throw new IllegalArgumentException(
442                "Illegal values in env for shell script path");
443          }
444        }
445    
446        containerMemory = Integer.parseInt(cliParser.getOptionValue(
447            "container_memory", "10"));
448        containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
449            "container_vcores", "1"));
450        numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
451            "num_containers", "1"));
452        if (numTotalContainers == 0) {
453          throw new IllegalArgumentException(
454              "Cannot run distributed shell with no containers");
455        }
456        requestPriority = Integer.parseInt(cliParser
457            .getOptionValue("priority", "0"));
458    
459        return true;
460      }
461    
462      /**
463       * Helper function to print usage
464       *
465       * @param opts Parsed command line options
466       */
467      private void printUsage(Options opts) {
468        new HelpFormatter().printHelp("ApplicationMaster", opts);
469      }
470    
471      /**
472       * Main run function for the application master
473       *
474       * @throws YarnException
475       * @throws IOException
476       */
477      @SuppressWarnings({ "unchecked" })
478      public boolean run() throws YarnException, IOException {
479        LOG.info("Starting ApplicationMaster");
480    
481        Credentials credentials =
482            UserGroupInformation.getCurrentUser().getCredentials();
483        DataOutputBuffer dob = new DataOutputBuffer();
484        credentials.writeTokenStorageToStream(dob);
485        // Now remove the AM->RM token so that containers cannot access it.
486        Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
487        while (iter.hasNext()) {
488          Token<?> token = iter.next();
489          if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
490            iter.remove();
491          }
492        }
493        allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
494    
495        AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
496        amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
497        amRMClient.init(conf);
498        amRMClient.start();
499    
500        containerListener = createNMCallbackHandler();
501        nmClientAsync = new NMClientAsyncImpl(containerListener);
502        nmClientAsync.init(conf);
503        nmClientAsync.start();
504    
505        // Setup local RPC Server to accept status requests directly from clients
506        // TODO need to setup a protocol for client to be able to communicate to
507        // the RPC server
508        // TODO use the rpc port info to register with the RM for the client to
509        // send requests to this app master
510    
511        // Register self with ResourceManager
512        // This will start heartbeating to the RM
513        appMasterHostname = NetUtils.getHostname();
514        RegisterApplicationMasterResponse response = amRMClient
515            .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
516                appMasterTrackingUrl);
517        // Dump out information about cluster capability as seen by the
518        // resource manager
519        int maxMem = response.getMaximumResourceCapability().getMemory();
520        LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
521        
522        int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
523        LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
524    
525        // A resource ask cannot exceed the max.
526        if (containerMemory > maxMem) {
527          LOG.info("Container memory specified above max threshold of cluster."
528              + " Using max value." + ", specified=" + containerMemory + ", max="
529              + maxMem);
530          containerMemory = maxMem;
531        }
532    
533        if (containerVirtualCores > maxVCores) {
534          LOG.info("Container virtual cores specified above max threshold of cluster."
535              + " Using max value." + ", specified=" + containerVirtualCores + ", max="
536              + maxVCores);
537          containerVirtualCores = maxVCores;
538        }
539    
540        // Setup ask for containers from RM
541        // Send request for containers to RM
542        // Until we get our fully allocated quota, we keep on polling RM for
543        // containers
544        // Keep looping until all the containers are launched and shell script
545        // executed on them ( regardless of success/failure).
546        for (int i = 0; i < numTotalContainers; ++i) {
547          ContainerRequest containerAsk = setupContainerAskForRM();
548          amRMClient.addContainerRequest(containerAsk);
549        }
550        numRequestedContainers.set(numTotalContainers);
551    
552        while (!done
553            && (numCompletedContainers.get() != numTotalContainers)) {
554          try {
555            Thread.sleep(200);
556          } catch (InterruptedException ex) {}
557        }
558        finish();
559        
560        return success;
561      }
562    
563      @VisibleForTesting
564      NMCallbackHandler createNMCallbackHandler() {
565        return new NMCallbackHandler(this);
566      }
567    
568      private void finish() {
569        // Join all launched threads
570        // needed for when we time out
571        // and we need to release containers
572        for (Thread launchThread : launchThreads) {
573          try {
574            launchThread.join(10000);
575          } catch (InterruptedException e) {
576            LOG.info("Exception thrown in thread join: " + e.getMessage());
577            e.printStackTrace();
578          }
579        }
580    
581        // When the application completes, it should stop all running containers
582        LOG.info("Application completed. Stopping running containers");
583        nmClientAsync.stop();
584    
585        // When the application completes, it should send a finish application
586        // signal to the RM
587        LOG.info("Application completed. Signalling finish to RM");
588    
589        FinalApplicationStatus appStatus;
590        String appMessage = null;
591        success = true;
592        if (numFailedContainers.get() == 0 && 
593            numCompletedContainers.get() == numTotalContainers) {
594          appStatus = FinalApplicationStatus.SUCCEEDED;
595        } else {
596          appStatus = FinalApplicationStatus.FAILED;
597          appMessage = "Diagnostics." + ", total=" + numTotalContainers
598              + ", completed=" + numCompletedContainers.get() + ", allocated="
599              + numAllocatedContainers.get() + ", failed="
600              + numFailedContainers.get();
601          success = false;
602        }
603        try {
604          amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
605        } catch (YarnException ex) {
606          LOG.error("Failed to unregister application", ex);
607        } catch (IOException e) {
608          LOG.error("Failed to unregister application", e);
609        }
610        
611        amRMClient.stop();
612      }
613      
614      private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
615        @SuppressWarnings("unchecked")
616        @Override
617        public void onContainersCompleted(List<ContainerStatus> completedContainers) {
618          LOG.info("Got response from RM for container ask, completedCnt="
619              + completedContainers.size());
620          for (ContainerStatus containerStatus : completedContainers) {
621            LOG.info("Got container status for containerID="
622                + containerStatus.getContainerId() + ", state="
623                + containerStatus.getState() + ", exitStatus="
624                + containerStatus.getExitStatus() + ", diagnostics="
625                + containerStatus.getDiagnostics());
626    
627            // non complete containers should not be here
628            assert (containerStatus.getState() == ContainerState.COMPLETE);
629    
630            // increment counters for completed/failed containers
631            int exitStatus = containerStatus.getExitStatus();
632            if (0 != exitStatus) {
633              // container failed
634              if (ContainerExitStatus.ABORTED != exitStatus) {
635                // shell script failed
636                // counts as completed
637                numCompletedContainers.incrementAndGet();
638                numFailedContainers.incrementAndGet();
639              } else {
640                // container was killed by framework, possibly preempted
641                // we should re-try as the container was lost for some reason
642                numAllocatedContainers.decrementAndGet();
643                numRequestedContainers.decrementAndGet();
644                // we do not need to release the container as it would be done
645                // by the RM
646              }
647            } else {
648              // nothing to do
649              // container completed successfully
650              numCompletedContainers.incrementAndGet();
651              LOG.info("Container completed successfully." + ", containerId="
652                  + containerStatus.getContainerId());
653            }
654          }
655          
656          // ask for more containers if any failed
657          int askCount = numTotalContainers - numRequestedContainers.get();
658          numRequestedContainers.addAndGet(askCount);
659    
660          if (askCount > 0) {
661            for (int i = 0; i < askCount; ++i) {
662              ContainerRequest containerAsk = setupContainerAskForRM();
663              amRMClient.addContainerRequest(containerAsk);
664            }
665          }
666          
667          if (numCompletedContainers.get() == numTotalContainers) {
668            done = true;
669          }
670        }
671    
672        @Override
673        public void onContainersAllocated(List<Container> allocatedContainers) {
674          LOG.info("Got response from RM for container ask, allocatedCnt="
675              + allocatedContainers.size());
676          numAllocatedContainers.addAndGet(allocatedContainers.size());
677          for (Container allocatedContainer : allocatedContainers) {
678            LOG.info("Launching shell command on a new container."
679                + ", containerId=" + allocatedContainer.getId()
680                + ", containerNode=" + allocatedContainer.getNodeId().getHost()
681                + ":" + allocatedContainer.getNodeId().getPort()
682                + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
683                + ", containerResourceMemory"
684                + allocatedContainer.getResource().getMemory()
685                + ", containerResourceVirtualCores"
686                + allocatedContainer.getResource().getVirtualCores());
687            // + ", containerToken"
688            // +allocatedContainer.getContainerToken().getIdentifier().toString());
689    
690            LaunchContainerRunnable runnableLaunchContainer =
691                new LaunchContainerRunnable(allocatedContainer, containerListener);
692            Thread launchThread = new Thread(runnableLaunchContainer);
693    
694            // launch and start the container on a separate thread to keep
695            // the main thread unblocked
696            // as all containers may not be allocated at one go.
697            launchThreads.add(launchThread);
698            launchThread.start();
699          }
700        }
701    
702        @Override
703        public void onShutdownRequest() {
704          done = true;
705        }
706    
707        @Override
708        public void onNodesUpdated(List<NodeReport> updatedNodes) {}
709    
710        @Override
711        public float getProgress() {
712          // set progress to deliver to RM on next heartbeat
713          float progress = (float) numCompletedContainers.get()
714              / numTotalContainers;
715          return progress;
716        }
717    
718        @Override
719        public void onError(Throwable e) {
720          done = true;
721          amRMClient.stop();
722        }
723      }
724    
725      @VisibleForTesting
726      static class NMCallbackHandler
727        implements NMClientAsync.CallbackHandler {
728    
729        private ConcurrentMap<ContainerId, Container> containers =
730            new ConcurrentHashMap<ContainerId, Container>();
731        private final ApplicationMaster applicationMaster;
732    
733        public NMCallbackHandler(ApplicationMaster applicationMaster) {
734          this.applicationMaster = applicationMaster;
735        }
736    
737        public void addContainer(ContainerId containerId, Container container) {
738          containers.putIfAbsent(containerId, container);
739        }
740    
741        @Override
742        public void onContainerStopped(ContainerId containerId) {
743          if (LOG.isDebugEnabled()) {
744            LOG.debug("Succeeded to stop Container " + containerId);
745          }
746          containers.remove(containerId);
747        }
748    
749        @Override
750        public void onContainerStatusReceived(ContainerId containerId,
751            ContainerStatus containerStatus) {
752          if (LOG.isDebugEnabled()) {
753            LOG.debug("Container Status: id=" + containerId + ", status=" +
754                containerStatus);
755          }
756        }
757    
758        @Override
759        public void onContainerStarted(ContainerId containerId,
760            Map<String, ByteBuffer> allServiceResponse) {
761          if (LOG.isDebugEnabled()) {
762            LOG.debug("Succeeded to start Container " + containerId);
763          }
764          Container container = containers.get(containerId);
765          if (container != null) {
766            applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
767          }
768        }
769    
770        @Override
771        public void onStartContainerError(ContainerId containerId, Throwable t) {
772          LOG.error("Failed to start Container " + containerId);
773          containers.remove(containerId);
774          applicationMaster.numCompletedContainers.incrementAndGet();
775          applicationMaster.numFailedContainers.incrementAndGet();
776        }
777    
778        @Override
779        public void onGetContainerStatusError(
780            ContainerId containerId, Throwable t) {
781          LOG.error("Failed to query the status of Container " + containerId);
782        }
783    
784        @Override
785        public void onStopContainerError(ContainerId containerId, Throwable t) {
786          LOG.error("Failed to stop Container " + containerId);
787          containers.remove(containerId);
788        }
789      }
790    
791      /**
792       * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
793       * that will execute the shell command.
794       */
795      private class LaunchContainerRunnable implements Runnable {
796    
797        // Allocated container
798        Container container;
799    
800        NMCallbackHandler containerListener;
801    
802        /**
803         * @param lcontainer Allocated container
804         * @param containerListener Callback handler of the container
805         */
806        public LaunchContainerRunnable(
807            Container lcontainer, NMCallbackHandler containerListener) {
808          this.container = lcontainer;
809          this.containerListener = containerListener;
810        }
811    
812        @Override
813        /**
814         * Connects to CM, sets up container launch context 
815         * for shell command and eventually dispatches the container 
816         * start request to the CM. 
817         */
818        public void run() {
819          LOG.info("Setting up container launch container for containerid="
820              + container.getId());
821          ContainerLaunchContext ctx = Records
822              .newRecord(ContainerLaunchContext.class);
823    
824          // Set the environment
825          ctx.setEnvironment(shellEnv);
826    
827          // Set the local resources
828          Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
829    
830          // The container for the eventual shell commands needs its own local
831          // resources too.
832          // In this scenario, if a shell script is specified, we need to have it
833          // copied and made available to the container.
834          if (!shellScriptPath.isEmpty()) {
835            LocalResource shellRsrc = Records.newRecord(LocalResource.class);
836            shellRsrc.setType(LocalResourceType.FILE);
837            shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
838            try {
839              shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
840                  shellScriptPath)));
841            } catch (URISyntaxException e) {
842              LOG.error("Error when trying to use shell script path specified"
843                  + " in env, path=" + shellScriptPath);
844              e.printStackTrace();
845    
846              // A failure scenario on bad input such as invalid shell script path
847              // We know we cannot continue launching the container
848              // so we should release it.
849              // TODO
850              numCompletedContainers.incrementAndGet();
851              numFailedContainers.incrementAndGet();
852              return;
853            }
854            shellRsrc.setTimestamp(shellScriptPathTimestamp);
855            shellRsrc.setSize(shellScriptPathLen);
856            localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
857                ExecShellStringPath, shellRsrc);
858            shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
859          }
860          ctx.setLocalResources(localResources);
861    
862          // Set the necessary command to execute on the allocated container
863          Vector<CharSequence> vargs = new Vector<CharSequence>(5);
864    
865          // Set executable command
866          vargs.add(shellCommand);
867          // Set shell script path
868          if (!shellScriptPath.isEmpty()) {
869            vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
870                : ExecShellStringPath);
871          }
872    
873          // Set args for the shell command if any
874          vargs.add(shellArgs);
875          // Add log redirect params
876          vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
877          vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
878    
879          // Get final commmand
880          StringBuilder command = new StringBuilder();
881          for (CharSequence str : vargs) {
882            command.append(str).append(" ");
883          }
884    
885          List<String> commands = new ArrayList<String>();
886          commands.add(command.toString());
887          ctx.setCommands(commands);
888    
889          // Set up tokens for the container too. Today, for normal shell commands,
890          // the container in distribute-shell doesn't need any tokens. We are
891          // populating them mainly for NodeManagers to be able to download any
892          // files in the distributed file-system. The tokens are otherwise also
893          // useful in cases, for e.g., when one is running a "hadoop dfs" command
894          // inside the distributed shell.
895          ctx.setTokens(allTokens.duplicate());
896    
897          containerListener.addContainer(container.getId(), container);
898          nmClientAsync.startContainerAsync(container, ctx);
899        }
900      }
901    
902      /**
903       * Setup the request that will be sent to the RM for the container ask.
904       *
905       * @return the setup ResourceRequest to be sent to RM
906       */
907      private ContainerRequest setupContainerAskForRM() {
908        // setup requirements for hosts
909        // using * as any host will do for the distributed shell app
910        // set the priority for the request
911        Priority pri = Records.newRecord(Priority.class);
912        // TODO - what is the range for priority? how to decide?
913        pri.setPriority(requestPriority);
914    
915        // Set up resource type requirements
916        // For now, memory and CPU are supported so we set memory and cpu requirements
917        Resource capability = Records.newRecord(Resource.class);
918        capability.setMemory(containerMemory);
919        capability.setVirtualCores(containerVirtualCores);
920    
921        ContainerRequest request = new ContainerRequest(capability, null, null,
922            pri);
923        LOG.info("Requested container ask: " + request.toString());
924        return request;
925      }
926    
927      private boolean fileExist(String filePath) {
928        return new File(filePath).exists();
929      }
930    
931      private String readContent(String filePath) throws IOException {
932        DataInputStream ds = null;
933        try {
934          ds = new DataInputStream(new FileInputStream(filePath));
935          return ds.readUTF();
936        } finally {
937          org.apache.commons.io.IOUtils.closeQuietly(ds);
938        }
939      }
940    }