001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.client.api.async;
020
021import java.nio.ByteBuffer;
022import java.util.Map;
023import java.util.concurrent.ConcurrentMap;
024
025import org.apache.hadoop.classification.InterfaceAudience.Private;
026import org.apache.hadoop.classification.InterfaceAudience.Public;
027import org.apache.hadoop.classification.InterfaceStability.Stable;
028import org.apache.hadoop.service.AbstractService;
029import org.apache.hadoop.yarn.api.records.Container;
030import org.apache.hadoop.yarn.api.records.ContainerId;
031import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
032import org.apache.hadoop.yarn.api.records.ContainerStatus;
033import org.apache.hadoop.yarn.api.records.NodeId;
034import org.apache.hadoop.yarn.api.records.Resource;
035import org.apache.hadoop.yarn.api.records.Token;
036import org.apache.hadoop.yarn.client.api.NMClient;
037import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
038import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
039import org.apache.hadoop.yarn.conf.YarnConfiguration;
040
041import com.google.common.annotations.VisibleForTesting;
042
043/**
044 * <code>NMClientAsync</code> handles communication with all the NodeManagers
045 * and provides asynchronous updates on getting responses from them. It
046 * maintains a thread pool to communicate with individual NMs where a number of
047 * worker threads process requests to NMs by using {@link NMClientImpl}. The max
048 * size of the thread pool is configurable through
049 * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
050 *
051 * It should be used in conjunction with a CallbackHandler. For example
052 *
053 * <pre>
054 * {@code
055 * class MyCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
056 *   public void onContainerStarted(ContainerId containerId,
057 *       Map<String, ByteBuffer> allServiceResponse) {
058 *     [post process after the container is started, process the response]
059 *   }
060
061 *   public void onContainerResourceIncreased(ContainerId containerId,
062 *       Resource resource) {
063 *     [post process after the container resource is increased]
064 *   }
065 *
066 *   public void onContainerStatusReceived(ContainerId containerId,
067 *       ContainerStatus containerStatus) {
068 *     [make use of the status of the container]
069 *   }
070 *
071 *   public void onContainerStopped(ContainerId containerId) {
072 *     [post process after the container is stopped]
073 *   }
074 *
075 *   public void onStartContainerError(
076 *       ContainerId containerId, Throwable t) {
077 *     [handle the raised exception]
078 *   }
079 *
080 *   public void onGetContainerStatusError(
081 *       ContainerId containerId, Throwable t) {
082 *     [handle the raised exception]
083 *   }
084 *
085 *   public void onStopContainerError(
086 *       ContainerId containerId, Throwable t) {
087 *     [handle the raised exception]
088 *   }
089 * }
090 * }
091 * </pre>
092 *
093 * The client's life-cycle should be managed like the following:
094 *
095 * <pre>
096 * {@code
097 * NMClientAsync asyncClient = 
098 *     NMClientAsync.createNMClientAsync(new MyCallbackhandler());
099 * asyncClient.init(conf);
100 * asyncClient.start();
101 * asyncClient.startContainer(container, containerLaunchContext);
102 * [... wait for container being started]
103 * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
104 *     container.getContainerToken());
105 * [... handle the status in the callback instance]
106 * asyncClient.stopContainer(container.getId(), container.getNodeId(),
107 *     container.getContainerToken());
108 * [... wait for container being stopped]
109 * asyncClient.stop();
110 * }
111 * </pre>
112 */
113@Public
114@Stable
115public abstract class NMClientAsync extends AbstractService {
116
117  protected NMClient client;
118  protected CallbackHandler callbackHandler;
119
120  public static NMClientAsync createNMClientAsync(
121      AbstractCallbackHandler callbackHandler) {
122    return new NMClientAsyncImpl(callbackHandler);
123  }
124
125  protected NMClientAsync(AbstractCallbackHandler callbackHandler) {
126    this (NMClientAsync.class.getName(), callbackHandler);
127  }
128
129  protected NMClientAsync(
130      String name, AbstractCallbackHandler callbackHandler) {
131    this (name, new NMClientImpl(), callbackHandler);
132  }
133
134  protected NMClientAsync(String name, NMClient client,
135      AbstractCallbackHandler callbackHandler) {
136    super(name);
137    this.setClient(client);
138    this.setCallbackHandler(callbackHandler);
139  }
140
141  /**
142   * @deprecated Use {@link #createNMClientAsync(AbstractCallbackHandler)}
143   *             instead.
144   */
145  @Deprecated
146  public static NMClientAsync createNMClientAsync(
147      CallbackHandler callbackHandler) {
148    return new NMClientAsyncImpl(callbackHandler);
149  }
150
151  /**
152   * @deprecated Use {@link #NMClientAsync(AbstractCallbackHandler)}
153   *             instead.
154   */
155  @Deprecated
156  protected NMClientAsync(CallbackHandler callbackHandler) {
157    this (NMClientAsync.class.getName(), callbackHandler);
158  }
159
160  /**
161   * @deprecated Use {@link #NMClientAsync(String, AbstractCallbackHandler)}
162   *             instead.
163   */
164  @Deprecated
165  protected NMClientAsync(String name, CallbackHandler callbackHandler) {
166    this (name, new NMClientImpl(), callbackHandler);
167  }
168
169  @Private
170  @VisibleForTesting
171  @Deprecated
172  protected NMClientAsync(String name, NMClient client,
173      CallbackHandler callbackHandler) {
174    super(name);
175    this.setClient(client);
176    this.setCallbackHandler(callbackHandler);
177  }
178
179  public abstract void startContainerAsync(
180      Container container, ContainerLaunchContext containerLaunchContext);
181
182  public abstract void increaseContainerResourceAsync(Container container);
183
184  public abstract void stopContainerAsync(
185      ContainerId containerId, NodeId nodeId);
186
187  public abstract void getContainerStatusAsync(
188      ContainerId containerId, NodeId nodeId);
189  
190  public NMClient getClient() {
191    return client;
192  }
193
194  public void setClient(NMClient client) {
195    this.client = client;
196  }
197
198  public CallbackHandler getCallbackHandler() {
199    return callbackHandler;
200  }
201
202  public void setCallbackHandler(CallbackHandler callbackHandler) {
203    this.callbackHandler = callbackHandler;
204  }
205
206  /**
207   * <p>
208   * The callback abstract class. The callback functions need to be implemented
209   * by {@link NMClientAsync} users. The APIs are called when responses from
210   * <code>NodeManager</code> are available.
211   * </p>
212   *
213   * <p>
214   * Once a callback happens, the users can chose to act on it in blocking or
215   * non-blocking manner. If the action on callback is done in a blocking
216   * manner, some of the threads performing requests on NodeManagers may get
217   * blocked depending on how many threads in the pool are busy.
218   * </p>
219   *
220   * <p>
221   * The implementation of the callback functions should not throw the
222   * unexpected exception. Otherwise, {@link NMClientAsync} will just
223   * catch, log and then ignore it.
224   * </p>
225   */
226  public abstract static class AbstractCallbackHandler
227      implements CallbackHandler {
228    /**
229     * The API is called when <code>NodeManager</code> responds to indicate its
230     * acceptance of the starting container request.
231     *
232     * @param containerId the Id of the container
233     * @param allServiceResponse a Map between the auxiliary service names and
234     *                           their outputs
235     */
236    public abstract void onContainerStarted(ContainerId containerId,
237        Map<String, ByteBuffer> allServiceResponse);
238
239    /**
240     * The API is called when <code>NodeManager</code> responds with the status
241     * of the container.
242     *
243     * @param containerId the Id of the container
244     * @param containerStatus the status of the container
245     */
246    public abstract void onContainerStatusReceived(ContainerId containerId,
247        ContainerStatus containerStatus);
248
249    /**
250     * The API is called when <code>NodeManager</code> responds to indicate the
251     * container is stopped.
252     *
253     * @param containerId the Id of the container
254     */
255    public abstract void onContainerStopped(ContainerId containerId);
256
257    /**
258     * The API is called when an exception is raised in the process of
259     * starting a container.
260     *
261     * @param containerId the Id of the container
262     * @param t the raised exception
263     */
264    public abstract void onStartContainerError(
265        ContainerId containerId, Throwable t);
266
267    /**
268     * The API is called when <code>NodeManager</code> responds to indicate
269     * the container resource has been successfully increased.
270     *
271     * @param containerId the Id of the container
272     * @param resource the target resource of the container
273     */
274    public abstract void onContainerResourceIncreased(
275        ContainerId containerId, Resource resource);
276
277    /**
278     * The API is called when an exception is raised in the process of
279     * querying the status of a container.
280     *
281     * @param containerId the Id of the container
282     * @param t the raised exception
283     */
284    public abstract void onGetContainerStatusError(
285        ContainerId containerId, Throwable t);
286
287    /**
288     * The API is called when an exception is raised in the process of
289     * increasing container resource.
290     *
291     * @param containerId the Id of the container
292     * @param t the raised exception
293     */
294    public abstract void onIncreaseContainerResourceError(
295        ContainerId containerId, Throwable t);
296
297    /**
298     * The API is called when an exception is raised in the process of
299     * stopping a container.
300     *
301     * @param containerId the Id of the container
302     * @param t the raised exception
303     */
304    public abstract void onStopContainerError(
305        ContainerId containerId, Throwable t);
306  }
307
308  /**
309   * @deprecated Use {@link NMClientAsync.AbstractCallbackHandler} instead.
310   *
311   * <p>
312   * The callback interface needs to be implemented by {@link NMClientAsync}
313   * users. The APIs are called when responses from <code>NodeManager</code> are
314   * available.
315   * </p>
316   *
317   * <p>
318   * Once a callback happens, the users can chose to act on it in blocking or
319   * non-blocking manner. If the action on callback is done in a blocking
320   * manner, some of the threads performing requests on NodeManagers may get
321   * blocked depending on how many threads in the pool are busy.
322   * </p>
323   *
324   * <p>
325   * The implementation of the callback function should not throw the
326   * unexpected exception. Otherwise, {@link NMClientAsync} will just
327   * catch, log and then ignore it.
328   * </p>
329   */
330  @Deprecated
331  public static interface CallbackHandler {
332    /**
333     * The API is called when <code>NodeManager</code> responds to indicate its
334     * acceptance of the starting container request
335     * @param containerId the Id of the container
336     * @param allServiceResponse a Map between the auxiliary service names and
337     *                           their outputs
338     */
339    void onContainerStarted(ContainerId containerId,
340        Map<String, ByteBuffer> allServiceResponse);
341
342    /**
343     * The API is called when <code>NodeManager</code> responds with the status
344     * of the container
345     * @param containerId the Id of the container
346     * @param containerStatus the status of the container
347     */
348    void onContainerStatusReceived(ContainerId containerId,
349        ContainerStatus containerStatus);
350
351    /**
352     * The API is called when <code>NodeManager</code> responds to indicate the
353     * container is stopped.
354     * @param containerId the Id of the container
355     */
356    void onContainerStopped(ContainerId containerId);
357
358    /**
359     * The API is called when an exception is raised in the process of
360     * starting a container
361     *
362     * @param containerId the Id of the container
363     * @param t the raised exception
364     */
365    void onStartContainerError(ContainerId containerId, Throwable t);
366
367    /**
368     * The API is called when an exception is raised in the process of
369     * querying the status of a container
370     *
371     * @param containerId the Id of the container
372     * @param t the raised exception
373     */
374    void onGetContainerStatusError(ContainerId containerId, Throwable t);
375
376    /**
377     * The API is called when an exception is raised in the process of
378     * stopping a container
379     *
380     * @param containerId the Id of the container
381     * @param t the raised exception
382     */
383    void onStopContainerError(ContainerId containerId, Throwable t);
384
385  }
386
387}