001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.client.api.async;
020
021import java.nio.ByteBuffer;
022import java.util.Map;
023import java.util.concurrent.ConcurrentMap;
024
025import org.apache.hadoop.classification.InterfaceAudience.Private;
026import org.apache.hadoop.classification.InterfaceAudience.Public;
027import org.apache.hadoop.classification.InterfaceStability.Stable;
028import org.apache.hadoop.service.AbstractService;
029import org.apache.hadoop.yarn.api.records.Container;
030import org.apache.hadoop.yarn.api.records.ContainerId;
031import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
032import org.apache.hadoop.yarn.api.records.ContainerStatus;
033import org.apache.hadoop.yarn.api.records.NodeId;
034import org.apache.hadoop.yarn.api.records.Token;
035import org.apache.hadoop.yarn.client.api.NMClient;
036import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
037import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
038import org.apache.hadoop.yarn.conf.YarnConfiguration;
039
040import com.google.common.annotations.VisibleForTesting;
041
042/**
043 * <code>NMClientAsync</code> handles communication with all the NodeManagers
044 * and provides asynchronous updates on getting responses from them. It
045 * maintains a thread pool to communicate with individual NMs where a number of
046 * worker threads process requests to NMs by using {@link NMClientImpl}. The max
047 * size of the thread pool is configurable through
048 * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
049 *
050 * It should be used in conjunction with a CallbackHandler. For example
051 *
052 * <pre>
053 * {@code
054 * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
055 *   public void onContainerStarted(ContainerId containerId,
056 *       Map<String, ByteBuffer> allServiceResponse) {
057 *     [post process after the container is started, process the response]
058 *   }
059 *
060 *   public void onContainerStatusReceived(ContainerId containerId,
061 *       ContainerStatus containerStatus) {
062 *     [make use of the status of the container]
063 *   }
064 *
065 *   public void onContainerStopped(ContainerId containerId) {
066 *     [post process after the container is stopped]
067 *   }
068 *
069 *   public void onStartContainerError(
070 *       ContainerId containerId, Throwable t) {
071 *     [handle the raised exception]
072 *   }
073 *
074 *   public void onGetContainerStatusError(
075 *       ContainerId containerId, Throwable t) {
076 *     [handle the raised exception]
077 *   }
078 *
079 *   public void onStopContainerError(
080 *       ContainerId containerId, Throwable t) {
081 *     [handle the raised exception]
082 *   }
083 * }
084 * }
085 * </pre>
086 *
087 * The client's life-cycle should be managed like the following:
088 *
089 * <pre>
090 * {@code
091 * NMClientAsync asyncClient = 
092 *     NMClientAsync.createNMClientAsync(new MyCallbackhandler());
093 * asyncClient.init(conf);
094 * asyncClient.start();
095 * asyncClient.startContainer(container, containerLaunchContext);
096 * [... wait for container being started]
097 * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
098 *     container.getContainerToken());
099 * [... handle the status in the callback instance]
100 * asyncClient.stopContainer(container.getId(), container.getNodeId(),
101 *     container.getContainerToken());
102 * [... wait for container being stopped]
103 * asyncClient.stop();
104 * }
105 * </pre>
106 */
107@Public
108@Stable
109public abstract class NMClientAsync extends AbstractService {
110
111  protected NMClient client;
112  protected CallbackHandler callbackHandler;
113
114  public static NMClientAsync createNMClientAsync(
115      CallbackHandler callbackHandler) {
116    return new NMClientAsyncImpl(callbackHandler);
117  }
118  
119  protected NMClientAsync(CallbackHandler callbackHandler) {
120    this (NMClientAsync.class.getName(), callbackHandler);
121  }
122
123  protected NMClientAsync(String name, CallbackHandler callbackHandler) {
124    this (name, new NMClientImpl(), callbackHandler);
125  }
126
127  @Private
128  @VisibleForTesting
129  protected NMClientAsync(String name, NMClient client,
130      CallbackHandler callbackHandler) {
131    super(name);
132    this.setClient(client);
133    this.setCallbackHandler(callbackHandler);
134  }
135
136  public abstract void startContainerAsync(
137      Container container, ContainerLaunchContext containerLaunchContext);
138
139  public abstract void stopContainerAsync(
140      ContainerId containerId, NodeId nodeId);
141
142  public abstract void getContainerStatusAsync(
143      ContainerId containerId, NodeId nodeId);
144  
145  public NMClient getClient() {
146    return client;
147  }
148
149  public void setClient(NMClient client) {
150    this.client = client;
151  }
152
153  public CallbackHandler getCallbackHandler() {
154    return callbackHandler;
155  }
156
157  public void setCallbackHandler(CallbackHandler callbackHandler) {
158    this.callbackHandler = callbackHandler;
159  }
160
161  /**
162   * <p>
163   * The callback interface needs to be implemented by {@link NMClientAsync}
164   * users. The APIs are called when responses from <code>NodeManager</code> are
165   * available.
166   * </p>
167   *
168   * <p>
169   * Once a callback happens, the users can chose to act on it in blocking or
170   * non-blocking manner. If the action on callback is done in a blocking
171   * manner, some of the threads performing requests on NodeManagers may get
172   * blocked depending on how many threads in the pool are busy.
173   * </p>
174   *
175   * <p>
176   * The implementation of the callback function should not throw the
177   * unexpected exception. Otherwise, {@link NMClientAsync} will just
178   * catch, log and then ignore it.
179   * </p>
180   */
181  public static interface CallbackHandler {
182    /**
183     * The API is called when <code>NodeManager</code> responds to indicate its
184     * acceptance of the starting container request
185     * @param containerId the Id of the container
186     * @param allServiceResponse a Map between the auxiliary service names and
187     *                           their outputs
188     */
189    void onContainerStarted(ContainerId containerId,
190        Map<String, ByteBuffer> allServiceResponse);
191
192    /**
193     * The API is called when <code>NodeManager</code> responds with the status
194     * of the container
195     * @param containerId the Id of the container
196     * @param containerStatus the status of the container
197     */
198    void onContainerStatusReceived(ContainerId containerId,
199        ContainerStatus containerStatus);
200
201    /**
202     * The API is called when <code>NodeManager</code> responds to indicate the
203     * container is stopped.
204     * @param containerId the Id of the container
205     */
206    void onContainerStopped(ContainerId containerId);
207
208    /**
209     * The API is called when an exception is raised in the process of
210     * starting a container
211     *
212     * @param containerId the Id of the container
213     * @param t the raised exception
214     */
215    void onStartContainerError(ContainerId containerId, Throwable t);
216
217    /**
218     * The API is called when an exception is raised in the process of
219     * querying the status of a container
220     *
221     * @param containerId the Id of the container
222     * @param t the raised exception
223     */
224    void onGetContainerStatusError(ContainerId containerId, Throwable t);
225
226    /**
227     * The API is called when an exception is raised in the process of
228     * stopping a container
229     *
230     * @param containerId the Id of the container
231     * @param t the raised exception
232     */
233    void onStopContainerError(ContainerId containerId, Throwable t);
234
235  }
236
237}