001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.applications.distributedshell;
020
021import java.io.BufferedReader;
022import java.io.IOException;
023import java.io.InputStreamReader;
024import java.net.InetSocketAddress;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Vector;
032import java.util.concurrent.CopyOnWriteArrayList;
033import java.util.concurrent.atomic.AtomicInteger;
034
035import org.apache.commons.cli.CommandLine;
036import org.apache.commons.cli.GnuParser;
037import org.apache.commons.cli.HelpFormatter;
038import org.apache.commons.cli.Options;
039import org.apache.commons.cli.ParseException;
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042
043import org.apache.hadoop.classification.InterfaceAudience;
044import org.apache.hadoop.classification.InterfaceStability;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.net.NetUtils;
047import org.apache.hadoop.security.UserGroupInformation;
048import org.apache.hadoop.yarn.api.AMRMProtocol;
049import org.apache.hadoop.yarn.api.ApplicationConstants;
050import org.apache.hadoop.yarn.api.ContainerManager;
051
052import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
053import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
054import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
055//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
056//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
057import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
058import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
059import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
060
061import org.apache.hadoop.yarn.api.records.AMResponse;
062import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
063import org.apache.hadoop.yarn.api.records.Container;
064import org.apache.hadoop.yarn.api.records.ContainerId;
065import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
066import org.apache.hadoop.yarn.api.records.ContainerState;
067import org.apache.hadoop.yarn.api.records.ContainerStatus;
068import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
069import org.apache.hadoop.yarn.api.records.LocalResource;
070import org.apache.hadoop.yarn.api.records.LocalResourceType;
071import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
072import org.apache.hadoop.yarn.api.records.Priority;
073import org.apache.hadoop.yarn.api.records.Resource;
074import org.apache.hadoop.yarn.api.records.ResourceRequest;
075import org.apache.hadoop.yarn.conf.YarnConfiguration;
076import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
077import org.apache.hadoop.yarn.ipc.YarnRPC;
078import org.apache.hadoop.yarn.util.ConverterUtils;
079import org.apache.hadoop.yarn.util.Records;
080
081/**
082 * An ApplicationMaster for executing shell commands on a set of launched containers using the YARN framework. 
083 * 
084 * <p>This class is meant to act as an example on how to write yarn-based application masters. </p>
085 * 
086 * <p> The ApplicationMaster is started on a container by the <code>ResourceManager</code>'s launcher. 
087 * The first thing that the <code>ApplicationMaster</code> needs to do is to connect and register itself with 
088 * the <code>ResourceManager</code>. The registration sets up information within the <code>ResourceManager</code>
089 * regarding what host:port the ApplicationMaster is listening on to provide any form of functionality to a client
090 * as well as a tracking url that a client can use to keep track of status/job history if needed. </p>
091 * 
092 * <p> The <code>ApplicationMaster</code> needs to send a heartbeat to the <code>ResourceManager</code> at regular intervals
093 * to inform the <code>ResourceManager</code> that it is up and alive. The {@link AMRMProtocol#allocate} to the 
094 * <code>ResourceManager</code> from the <code>ApplicationMaster</code> acts as a heartbeat.
095 * 
096 * <p> For the actual handling of the job, the <code>ApplicationMaster</code> has to request the 
097 * <code>ResourceManager</code> via {@link AllocateRequest} for the required no. of containers using {@link ResourceRequest}
098 * with the necessary resource specifications such as node location, computational (memory/disk/cpu) resource requirements.
099 * The <code>ResourceManager</code> responds with an {@link AllocateResponse} that informs the <code>ApplicationMaster</code> 
100 * of the set of newly allocated containers, completed containers as well as current state of available resources. </p>
101 * 
102 * <p> For each allocated container, the <code>ApplicationMaster</code> can then set up the necessary launch context via 
103 * {@link ContainerLaunchContext} to specify the allocated container id, local resources required by the executable, 
104 * the environment to be setup for the executable, commands to execute, etc. and submit a {@link StartContainerRequest} 
105 * to the {@link ContainerManager} to launch and execute the defined commands on the given allocated container. </p>
106 *  
107 * <p> The <code>ApplicationMaster</code> can monitor the launched container by either querying the <code>ResourceManager</code> 
108 * using {@link AMRMProtocol#allocate} to get updates on completed containers or via the {@link ContainerManager} 
109 * by querying for the status of the allocated container's {@link ContainerId}.
110 * 
111 * <p> After the job has been completed, the <code>ApplicationMaster</code> has to send a {@link FinishApplicationMasterRequest} 
112 * to the <code>ResourceManager</code> to inform it that the <code>ApplicationMaster</code> has been completed. 
113 */
114@InterfaceAudience.Public
115@InterfaceStability.Unstable
116public class ApplicationMaster {
117
118  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
119
120  // Configuration 
121  private Configuration conf;
122  // YARN RPC to communicate with the Resource Manager or Node Manager
123  private YarnRPC rpc;
124
125  // Handle to communicate with the Resource Manager
126  private AMRMProtocol resourceManager;
127
128  // Application Attempt Id ( combination of attemptId and fail count )
129  private ApplicationAttemptId appAttemptID;
130
131  // TODO
132  // For status update for clients - yet to be implemented
133  // Hostname of the container 
134  private String appMasterHostname = "";
135  // Port on which the app master listens for status update requests from clients
136  private int appMasterRpcPort = 0;
137  // Tracking url to which app master publishes info for clients to monitor 
138  private String appMasterTrackingUrl = "";
139
140  // App Master configuration
141  // No. of containers to run shell command on
142  private int numTotalContainers = 1;
143  // Memory to request for the container on which the shell command will run 
144  private int containerMemory = 10;
145  // Priority of the request
146  private int requestPriority; 
147
148  // Incremental counter for rpc calls to the RM
149  private AtomicInteger rmRequestID = new AtomicInteger();
150
151  // Simple flag to denote whether all works is done
152  private boolean appDone = false; 
153  // Counter for completed containers ( complete denotes successful or failed )
154  private AtomicInteger numCompletedContainers = new AtomicInteger();
155  // Allocated container count so that we know how many containers has the RM
156  // allocated to us
157  private AtomicInteger numAllocatedContainers = new AtomicInteger();
158  // Count of failed containers 
159  private AtomicInteger numFailedContainers = new AtomicInteger();
160  // Count of containers already requested from the RM
161  // Needed as once requested, we should not request for containers again and again. 
162  // Only request for more if the original requirement changes. 
163  private AtomicInteger numRequestedContainers = new AtomicInteger();
164
165  // Shell command to be executed 
166  private String shellCommand = ""; 
167  // Args to be passed to the shell command
168  private String shellArgs = "";
169  // Env variables to be setup for the shell command 
170  private Map<String, String> shellEnv = new HashMap<String, String>();
171
172  // Location of shell script ( obtained from info set in env )
173  // Shell script path in fs
174  private String shellScriptPath = ""; 
175  // Timestamp needed for creating a local resource
176  private long shellScriptPathTimestamp = 0;
177  // File length needed for local resource
178  private long shellScriptPathLen = 0;
179
180  // Hardcoded path to shell script in launch container's local env
181  private final String ExecShellStringPath = "ExecShellScript.sh";
182
183  // Containers to be released
184  private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
185
186  // Launch threads
187  private List<Thread> launchThreads = new ArrayList<Thread>();
188
189  /**
190   * @param args Command line args
191   */
192  public static void main(String[] args) {
193    boolean result = false;
194    try {
195      ApplicationMaster appMaster = new ApplicationMaster();
196      LOG.info("Initializing ApplicationMaster");
197      boolean doRun = appMaster.init(args);
198      if (!doRun) {
199        System.exit(0);
200      }
201      result = appMaster.run();
202    } catch (Throwable t) {
203      LOG.fatal("Error running ApplicationMaster", t);
204      System.exit(1);
205    }
206    if (result) {
207      LOG.info("Application Master completed successfully. exiting");
208      System.exit(0);
209    }
210    else {
211      LOG.info("Application Master failed. exiting");
212      System.exit(2);
213    }
214  }
215
216  /**
217   * Dump out contents of $CWD and the environment to stdout for debugging
218   */
219  private void dumpOutDebugInfo() {
220
221    LOG.info("Dump debug output");
222    Map<String, String> envs = System.getenv();
223    for (Map.Entry<String, String> env : envs.entrySet()) {
224      LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
225      System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue());
226    }
227
228    String cmd = "ls -al";
229    Runtime run = Runtime.getRuntime();
230    Process pr = null;
231    try {
232      pr = run.exec(cmd);
233      pr.waitFor();
234
235      BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream()));
236      String line = "";
237      while ((line=buf.readLine())!=null) {
238        LOG.info("System CWD content: " + line);
239        System.out.println("System CWD content: " + line);
240      }
241      buf.close();
242    } catch (IOException e) {
243      e.printStackTrace();
244    } catch (InterruptedException e) {
245      e.printStackTrace();
246    } 
247  }
248
249  public ApplicationMaster() throws Exception {
250    // Set up the configuration and RPC
251    conf = new Configuration();
252    rpc = YarnRPC.create(conf);
253  }
254  /**
255   * Parse command line options
256   * @param args Command line args 
257   * @return Whether init successful and run should be invoked 
258   * @throws ParseException
259   * @throws IOException 
260   */
261  public boolean init(String[] args) throws ParseException, IOException {
262
263    Options opts = new Options();
264    opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
265    opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
266    opts.addOption("shell_script", true, "Location of the shell script to be executed");
267    opts.addOption("shell_args", true, "Command line args for the shell script");
268    opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
269    opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
270    opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
271    opts.addOption("priority", true, "Application Priority. Default 0");
272    opts.addOption("debug", false, "Dump out debug information");
273
274    opts.addOption("help", false, "Print usage");
275    CommandLine cliParser = new GnuParser().parse(opts, args);
276
277    if (args.length == 0) {
278      printUsage(opts);
279      throw new IllegalArgumentException("No args specified for application master to initialize");
280    }
281
282    if (cliParser.hasOption("help")) {
283      printUsage(opts);
284      return false;
285    }
286
287    if (cliParser.hasOption("debug")) {
288      dumpOutDebugInfo();
289    }
290
291    Map<String, String> envs = System.getenv();
292
293    appAttemptID = Records.newRecord(ApplicationAttemptId.class);
294    if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
295      if (cliParser.hasOption("app_attempt_id")) {
296        String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
297        appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
298      } 
299      else {
300        throw new IllegalArgumentException("Application Attempt Id not set in the environment");
301      }
302    } else {
303      ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
304      appAttemptID = containerId.getApplicationAttemptId();
305    }
306
307    LOG.info("Application master for app"
308        + ", appId=" + appAttemptID.getApplicationId().getId()
309        + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp()
310        + ", attemptId=" + appAttemptID.getAttemptId());
311
312    if (!cliParser.hasOption("shell_command")) {
313      throw new IllegalArgumentException("No shell command specified to be executed by application master");
314    }
315    shellCommand = cliParser.getOptionValue("shell_command");
316
317    if (cliParser.hasOption("shell_args")) {
318      shellArgs = cliParser.getOptionValue("shell_args");
319    }
320    if (cliParser.hasOption("shell_env")) { 
321      String shellEnvs[] = cliParser.getOptionValues("shell_env");
322      for (String env : shellEnvs) {
323        env = env.trim();
324        int index = env.indexOf('=');
325        if (index == -1) {
326          shellEnv.put(env, "");
327          continue;
328        }
329        String key = env.substring(0, index);
330        String val = "";
331        if (index < (env.length()-1)) {
332          val = env.substring(index+1);
333        }
334        shellEnv.put(key, val);
335      }
336    }
337
338    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
339      shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
340
341      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
342        shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
343      }
344      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
345        shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
346      }
347
348      if (!shellScriptPath.isEmpty()
349          && (shellScriptPathTimestamp <= 0 
350          || shellScriptPathLen <= 0)) {
351        LOG.error("Illegal values in env for shell script path"
352            + ", path=" + shellScriptPath
353            + ", len=" + shellScriptPathLen
354            + ", timestamp=" + shellScriptPathTimestamp);
355        throw new IllegalArgumentException("Illegal values in env for shell script path");
356      }
357    }
358
359    containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
360    numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
361    requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
362
363    return true;
364  }
365
366  /**
367   * Helper function to print usage 
368   * @param opts Parsed command line options
369   */
370  private void printUsage(Options opts) {
371    new HelpFormatter().printHelp("ApplicationMaster", opts);
372  }
373
374  /**
375   * Main run function for the application master
376   * @throws YarnRemoteException
377   */
378  public boolean run() throws YarnRemoteException {
379    LOG.info("Starting ApplicationMaster");
380
381    // Connect to ResourceManager
382    resourceManager = connectToRM();
383
384    // Setup local RPC Server to accept status requests directly from clients 
385    // TODO need to setup a protocol for client to be able to communicate to the RPC server 
386    // TODO use the rpc port info to register with the RM for the client to send requests to this app master
387
388    // Register self with ResourceManager 
389    RegisterApplicationMasterResponse response = registerToRM();
390    // Dump out information about cluster capability as seen by the resource manager
391    int minMem = response.getMinimumResourceCapability().getMemory();
392    int maxMem = response.getMaximumResourceCapability().getMemory();
393    LOG.info("Min mem capabililty of resources in this cluster " + minMem);
394    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
395
396    // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be 
397    // a multiple of the min value and cannot exceed the max. 
398    // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
399    if (containerMemory < minMem) {
400      LOG.info("Container memory specified below min threshold of cluster. Using min value."
401          + ", specified=" + containerMemory
402          + ", min=" + minMem);
403      containerMemory = minMem; 
404    } 
405    else if (containerMemory > maxMem) {
406      LOG.info("Container memory specified above max threshold of cluster. Using max value."
407          + ", specified=" + containerMemory
408          + ", max=" + maxMem);
409      containerMemory = maxMem;
410    }
411
412    // Setup heartbeat emitter
413    // TODO poll RM every now and then with an empty request to let RM know that we are alive
414    // The heartbeat interval after which an AM is timed out by the RM is defined by a config setting: 
415    // RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
416    // The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter 
417    // is not required.
418
419    // Setup ask for containers from RM
420    // Send request for containers to RM
421    // Until we get our fully allocated quota, we keep on polling RM for containers
422    // Keep looping until all the containers are launched and shell script executed on them 
423    // ( regardless of success/failure). 
424
425    int loopCounter = -1;
426
427    while (numCompletedContainers.get() < numTotalContainers
428        && !appDone) {
429      loopCounter++;
430
431      // log current state
432      LOG.info("Current application state: loop=" + loopCounter 
433          + ", appDone=" + appDone
434          + ", total=" + numTotalContainers
435          + ", requested=" + numRequestedContainers
436          + ", completed=" + numCompletedContainers
437          + ", failed=" + numFailedContainers
438          + ", currentAllocated=" + numAllocatedContainers);
439
440      // Sleep before each loop when asking RM for containers
441      // to avoid flooding RM with spurious requests when it 
442      // need not have any available containers 
443      // Sleeping for 1000 ms.
444      try {
445        Thread.sleep(1000);
446      } catch (InterruptedException e) {
447        LOG.info("Sleep interrupted " + e.getMessage());
448      }
449
450      // No. of containers to request 
451      // For the first loop, askCount will be equal to total containers needed 
452      // From that point on, askCount will always be 0 as current implementation 
453      // does not change its ask on container failures. 
454      int askCount = numTotalContainers - numRequestedContainers.get();
455      numRequestedContainers.addAndGet(askCount);
456
457      // Setup request to be sent to RM to allocate containers
458      List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
459      if (askCount > 0) {
460        ResourceRequest containerAsk = setupContainerAskForRM(askCount);
461        resourceReq.add(containerAsk);
462      }
463
464      // Send the request to RM 
465      LOG.info("Asking RM for containers"
466          + ", askCount=" + askCount);
467      AMResponse amResp =sendContainerAskToRM(resourceReq);
468
469      // Retrieve list of allocated containers from the response 
470      List<Container> allocatedContainers = amResp.getAllocatedContainers();
471      LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
472      numAllocatedContainers.addAndGet(allocatedContainers.size());
473      for (Container allocatedContainer : allocatedContainers) {
474        LOG.info("Launching shell command on a new container."
475            + ", containerId=" + allocatedContainer.getId()
476            + ", containerNode=" + allocatedContainer.getNodeId().getHost() 
477            + ":" + allocatedContainer.getNodeId().getPort()
478            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
479            + ", containerState" + allocatedContainer.getState()
480            + ", containerResourceMemory" + allocatedContainer.getResource().getMemory());
481        //+ ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
482
483        LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer);
484        Thread launchThread = new Thread(runnableLaunchContainer);
485
486        // launch and start the container on a separate thread to keep the main thread unblocked
487        // as all containers may not be allocated at one go.
488        launchThreads.add(launchThread);
489        launchThread.start();
490      }
491
492      // Check what the current available resources in the cluster are
493      // TODO should we do anything if the available resources are not enough? 
494      Resource availableResources = amResp.getAvailableResources();
495      LOG.info("Current available resources in the cluster " + availableResources);
496
497      // Check the completed containers
498      List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
499      LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
500      for (ContainerStatus containerStatus : completedContainers) {
501        LOG.info("Got container status for containerID= " + containerStatus.getContainerId()
502            + ", state=" + containerStatus.getState()
503            + ", exitStatus=" + containerStatus.getExitStatus() 
504            + ", diagnostics=" + containerStatus.getDiagnostics());
505
506        // non complete containers should not be here 
507        assert(containerStatus.getState() == ContainerState.COMPLETE);
508
509        // increment counters for completed/failed containers
510        int exitStatus = containerStatus.getExitStatus();
511        if (0 != exitStatus) {
512          // container failed 
513          if (-100 != exitStatus) {
514            // shell script failed
515            // counts as completed 
516            numCompletedContainers.incrementAndGet();
517            numFailedContainers.incrementAndGet();
518          }
519          else { 
520            // something else bad happened 
521            // app job did not complete for some reason 
522            // we should re-try as the container was lost for some reason
523            numAllocatedContainers.decrementAndGet();
524            numRequestedContainers.decrementAndGet();
525            // we do not need to release the container as it would be done
526            // by the RM/CM.
527          }
528        }
529        else { 
530          // nothing to do 
531          // container completed successfully 
532          numCompletedContainers.incrementAndGet();
533          LOG.info("Container completed successfully."
534              + ", containerId=" + containerStatus.getContainerId());
535        }
536
537      }
538      if (numCompletedContainers.get() == numTotalContainers) {
539        appDone = true;
540      }
541
542      LOG.info("Current application state: loop=" + loopCounter
543          + ", appDone=" + appDone
544          + ", total=" + numTotalContainers
545          + ", requested=" + numRequestedContainers
546          + ", completed=" + numCompletedContainers
547          + ", failed=" + numFailedContainers
548          + ", currentAllocated=" + numAllocatedContainers);
549
550      // TODO 
551      // Add a timeout handling layer 
552      // for misbehaving shell commands
553    }
554
555    // Join all launched threads
556    // needed for when we time out 
557    // and we need to release containers
558    for (Thread launchThread : launchThreads) {
559      try {
560        launchThread.join(10000);
561      } catch (InterruptedException e) {
562        LOG.info("Exception thrown in thread join: " + e.getMessage());
563        e.printStackTrace();
564      }
565    }
566
567    // When the application completes, it should send a finish application signal 
568    // to the RM
569    LOG.info("Application completed. Signalling finish to RM");
570
571    FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
572    finishReq.setAppAttemptId(appAttemptID);
573    boolean isSuccess = true;
574    if (numFailedContainers.get() == 0) {
575      finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
576    }
577    else {
578      finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
579      String diagnostics = "Diagnostics."
580          + ", total=" + numTotalContainers
581          + ", completed=" + numCompletedContainers.get()
582          + ", allocated=" + numAllocatedContainers.get()
583          + ", failed=" + numFailedContainers.get();
584      finishReq.setDiagnostics(diagnostics);
585      isSuccess = false;
586    }
587    resourceManager.finishApplicationMaster(finishReq);
588    return isSuccess;
589  }
590
591  /**
592   * Thread to connect to the {@link ContainerManager} and 
593   * launch the container that will execute the shell command. 
594   */
595  private class LaunchContainerRunnable implements Runnable {
596
597    // Allocated container 
598    Container container;
599    // Handle to communicate with ContainerManager
600    ContainerManager cm;
601
602    /**
603     * @param lcontainer Allocated container
604     */
605    public LaunchContainerRunnable(Container lcontainer) {
606      this.container = lcontainer;
607    }
608
609    /**
610     * Helper function to connect to CM
611     */
612    private void connectToCM() {
613      LOG.debug("Connecting to ContainerManager for containerid=" + container.getId());
614      String cmIpPortStr = container.getNodeId().getHost() + ":"
615          + container.getNodeId().getPort();
616      InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
617      LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
618      this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
619    }
620
621
622    @Override
623    /**
624     * Connects to CM, sets up container launch context 
625     * for shell command and eventually dispatches the container 
626     * start request to the CM. 
627     */
628    public void run() {
629      // Connect to ContainerManager 
630      connectToCM();
631
632      LOG.info("Setting up container launch container for containerid=" + container.getId());
633      ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
634
635      ctx.setContainerId(container.getId());
636      ctx.setResource(container.getResource());
637
638      try {
639        ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
640      } catch (IOException e) {
641        LOG.info("Getting current user info failed when trying to launch the container"
642            + e.getMessage());
643      }
644
645      // Set the environment 
646      ctx.setEnvironment(shellEnv);
647
648      // Set the local resources 
649      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
650
651      // The container for the eventual shell commands needs its own local resources too. 
652      // In this scenario, if a shell script is specified, we need to have it copied 
653      // and made available to the container. 
654      if (!shellScriptPath.isEmpty()) {
655        LocalResource shellRsrc = Records.newRecord(LocalResource.class);
656        shellRsrc.setType(LocalResourceType.FILE);
657        shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
658        try {
659          shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
660        } catch (URISyntaxException e) {
661          LOG.error("Error when trying to use shell script path specified in env"
662              + ", path=" + shellScriptPath);
663          e.printStackTrace();
664
665          // A failure scenario on bad input such as invalid shell script path 
666          // We know we cannot continue launching the container 
667          // so we should release it.
668          // TODO
669          numCompletedContainers.incrementAndGet();
670          numFailedContainers.incrementAndGet();
671          return;
672        }
673        shellRsrc.setTimestamp(shellScriptPathTimestamp);
674        shellRsrc.setSize(shellScriptPathLen);
675        localResources.put(ExecShellStringPath, shellRsrc);
676      }
677      ctx.setLocalResources(localResources);
678
679      // Set the necessary command to execute on the allocated container 
680      Vector<CharSequence> vargs = new Vector<CharSequence>(5);
681
682      // Set executable command 
683      vargs.add(shellCommand);
684      // Set shell script path 
685      if (!shellScriptPath.isEmpty()) {
686        vargs.add(ExecShellStringPath);
687      }
688
689      // Set args for the shell command if any
690      vargs.add(shellArgs);
691      // Add log redirect params
692      // TODO
693      // We should redirect the output to hdfs instead of local logs 
694      // so as to be able to look at the final output after the containers 
695      // have been released. 
696      // Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err] 
697      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
698      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
699
700      // Get final commmand
701      StringBuilder command = new StringBuilder();
702      for (CharSequence str : vargs) {
703        command.append(str).append(" ");
704      }
705
706      List<String> commands = new ArrayList<String>();
707      commands.add(command.toString());
708      ctx.setCommands(commands);
709
710      StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
711      startReq.setContainerLaunchContext(ctx);
712      try {
713        cm.startContainer(startReq);
714      } catch (YarnRemoteException e) {
715        LOG.info("Start container failed for :"
716            + ", containerId=" + container.getId());
717        e.printStackTrace();
718        // TODO do we need to release this container? 
719      }
720
721      // Get container status?
722      // Left commented out as the shell scripts are short lived 
723      // and we are relying on the status for completed containers from RM to detect status
724
725      //    GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
726      //    statusReq.setContainerId(container.getId());
727      //    GetContainerStatusResponse statusResp;
728      //try {
729      //statusResp = cm.getContainerStatus(statusReq);
730      //    LOG.info("Container Status"
731      //    + ", id=" + container.getId()
732      //    + ", status=" +statusResp.getStatus());
733      //} catch (YarnRemoteException e) {
734      //e.printStackTrace();
735      //}
736    }
737  }
738
739  /**
740   * Connect to the Resource Manager
741   * @return Handle to communicate with the RM
742   */
743  private AMRMProtocol connectToRM() {
744    YarnConfiguration yarnConf = new YarnConfiguration(conf);
745    InetSocketAddress rmAddress = yarnConf.getSocketAddr(
746        YarnConfiguration.RM_SCHEDULER_ADDRESS,
747        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
748        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
749    LOG.info("Connecting to ResourceManager at " + rmAddress);
750    return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
751  }
752
753  /** 
754   * Register the Application Master to the Resource Manager
755   * @return the registration response from the RM
756   * @throws YarnRemoteException
757   */
758  private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
759    RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
760
761    // set the required info into the registration request: 
762    // application attempt id, 
763    // host on which the app master is running
764    // rpc port on which the app master accepts requests from the client 
765    // tracking url for the app master
766    appMasterRequest.setApplicationAttemptId(appAttemptID);
767    appMasterRequest.setHost(appMasterHostname);
768    appMasterRequest.setRpcPort(appMasterRpcPort);
769    appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
770
771    return resourceManager.registerApplicationMaster(appMasterRequest);
772  }
773
774  /**
775   * Setup the request that will be sent to the RM for the container ask.
776   * @param numContainers Containers to ask for from RM
777   * @return the setup ResourceRequest to be sent to RM
778   */
779  private ResourceRequest setupContainerAskForRM(int numContainers) {
780    ResourceRequest request = Records.newRecord(ResourceRequest.class);
781
782    // setup requirements for hosts 
783    // whether a particular rack/host is needed 
784    // Refer to apis under org.apache.hadoop.net for more 
785    // details on how to get figure out rack/host mapping.
786    // using * as any host will do for the distributed shell app
787    request.setHostName("*");
788
789    // set no. of containers needed
790    request.setNumContainers(numContainers);
791
792    // set the priority for the request
793    Priority pri = Records.newRecord(Priority.class);
794    // TODO - what is the range for priority? how to decide? 
795    pri.setPriority(requestPriority);
796    request.setPriority(pri);
797
798    // Set up resource type requirements
799    // For now, only memory is supported so we set memory requirements
800    Resource capability = Records.newRecord(Resource.class);
801    capability.setMemory(containerMemory);
802    request.setCapability(capability);
803
804    return request;
805  }
806
807  /**
808   * Ask RM to allocate given no. of containers to this Application Master
809   * @param requestedContainers Containers to ask for from RM
810   * @return Response from RM to AM with allocated containers 
811   * @throws YarnRemoteException
812   */
813  private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers)
814      throws YarnRemoteException {
815    AllocateRequest req = Records.newRecord(AllocateRequest.class);
816    req.setResponseId(rmRequestID.incrementAndGet());
817    req.setApplicationAttemptId(appAttemptID);
818    req.addAllAsks(requestedContainers);
819    req.addAllReleases(releasedContainers);
820    req.setProgress((float)numCompletedContainers.get()/numTotalContainers);
821
822    LOG.info("Sending request to RM for containers"
823        + ", requestedSet=" + requestedContainers.size()
824        + ", releasedSet=" + releasedContainers.size()
825        + ", progress=" + req.getProgress());
826
827    for (ResourceRequest  rsrcReq : requestedContainers) {
828      LOG.info("Requested container ask: " + rsrcReq.toString());
829    }
830    for (ContainerId id : releasedContainers) {
831      LOG.info("Released container, id=" + id.getId());
832    }
833
834    AllocateResponse resp = resourceManager.allocate(req);
835    return resp.getAMResponse();
836  }
837}