001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.yarn.client.api.async;
020    
021    import java.nio.ByteBuffer;
022    import java.util.Map;
023    import java.util.concurrent.ConcurrentMap;
024    
025    import org.apache.hadoop.classification.InterfaceAudience.Private;
026    import org.apache.hadoop.classification.InterfaceAudience.Public;
027    import org.apache.hadoop.classification.InterfaceStability.Stable;
028    import org.apache.hadoop.service.AbstractService;
029    import org.apache.hadoop.yarn.api.records.Container;
030    import org.apache.hadoop.yarn.api.records.ContainerId;
031    import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
032    import org.apache.hadoop.yarn.api.records.ContainerStatus;
033    import org.apache.hadoop.yarn.api.records.NodeId;
034    import org.apache.hadoop.yarn.api.records.Token;
035    import org.apache.hadoop.yarn.client.api.NMClient;
036    import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
037    import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
038    import org.apache.hadoop.yarn.conf.YarnConfiguration;
039    
040    import com.google.common.annotations.VisibleForTesting;
041    
042    /**
043     * <code>NMClientAsync</code> handles communication with all the NodeManagers
044     * and provides asynchronous updates on getting responses from them. It
045     * maintains a thread pool to communicate with individual NMs where a number of
046     * worker threads process requests to NMs by using {@link NMClientImpl}. The max
047     * size of the thread pool is configurable through
048     * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
049     *
050     * It should be used in conjunction with a CallbackHandler. For example
051     *
052     * <pre>
053     * {@code
054     * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
055     *   public void onContainerStarted(ContainerId containerId,
056     *       Map<String, ByteBuffer> allServiceResponse) {
057     *     [post process after the container is started, process the response]
058     *   }
059     *
060     *   public void onContainerStatusReceived(ContainerId containerId,
061     *       ContainerStatus containerStatus) {
062     *     [make use of the status of the container]
063     *   }
064     *
065     *   public void onContainerStopped(ContainerId containerId) {
066     *     [post process after the container is stopped]
067     *   }
068     *
069     *   public void onStartContainerError(
070     *       ContainerId containerId, Throwable t) {
071     *     [handle the raised exception]
072     *   }
073     *
074     *   public void onGetContainerStatusError(
075     *       ContainerId containerId, Throwable t) {
076     *     [handle the raised exception]
077     *   }
078     *
079     *   public void onStopContainerError(
080     *       ContainerId containerId, Throwable t) {
081     *     [handle the raised exception]
082     *   }
083     * }
084     * }
085     * </pre>
086     *
087     * The client's life-cycle should be managed like the following:
088     *
089     * <pre>
090     * {@code
091     * NMClientAsync asyncClient = 
092     *     NMClientAsync.createNMClientAsync(new MyCallbackhandler());
093     * asyncClient.init(conf);
094     * asyncClient.start();
095     * asyncClient.startContainer(container, containerLaunchContext);
096     * [... wait for container being started]
097     * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
098     *     container.getContainerToken());
099     * [... handle the status in the callback instance]
100     * asyncClient.stopContainer(container.getId(), container.getNodeId(),
101     *     container.getContainerToken());
102     * [... wait for container being stopped]
103     * asyncClient.stop();
104     * }
105     * </pre>
106     */
107    @Public
108    @Stable
109    public abstract class NMClientAsync extends AbstractService {
110    
111      protected NMClient client;
112      protected CallbackHandler callbackHandler;
113    
114      public static NMClientAsync createNMClientAsync(
115          CallbackHandler callbackHandler) {
116        return new NMClientAsyncImpl(callbackHandler);
117      }
118      
119      protected NMClientAsync(CallbackHandler callbackHandler) {
120        this (NMClientAsync.class.getName(), callbackHandler);
121      }
122    
123      protected NMClientAsync(String name, CallbackHandler callbackHandler) {
124        this (name, new NMClientImpl(), callbackHandler);
125      }
126    
127      @Private
128      @VisibleForTesting
129      protected NMClientAsync(String name, NMClient client,
130          CallbackHandler callbackHandler) {
131        super(name);
132        this.setClient(client);
133        this.setCallbackHandler(callbackHandler);
134      }
135    
136      public abstract void startContainerAsync(
137          Container container, ContainerLaunchContext containerLaunchContext);
138    
139      public abstract void stopContainerAsync(
140          ContainerId containerId, NodeId nodeId);
141    
142      public abstract void getContainerStatusAsync(
143          ContainerId containerId, NodeId nodeId);
144      
145      public NMClient getClient() {
146        return client;
147      }
148    
149      public void setClient(NMClient client) {
150        this.client = client;
151      }
152    
153      public CallbackHandler getCallbackHandler() {
154        return callbackHandler;
155      }
156    
157      public void setCallbackHandler(CallbackHandler callbackHandler) {
158        this.callbackHandler = callbackHandler;
159      }
160    
161      /**
162       * <p>
163       * The callback interface needs to be implemented by {@link NMClientAsync}
164       * users. The APIs are called when responses from <code>NodeManager</code> are
165       * available.
166       * </p>
167       *
168       * <p>
169       * Once a callback happens, the users can chose to act on it in blocking or
170       * non-blocking manner. If the action on callback is done in a blocking
171       * manner, some of the threads performing requests on NodeManagers may get
172       * blocked depending on how many threads in the pool are busy.
173       * </p>
174       *
175       * <p>
176       * The implementation of the callback function should not throw the
177       * unexpected exception. Otherwise, {@link NMClientAsync} will just
178       * catch, log and then ignore it.
179       * </p>
180       */
181      public static interface CallbackHandler {
182        /**
183         * The API is called when <code>NodeManager</code> responds to indicate its
184         * acceptance of the starting container request
185         * @param containerId the Id of the container
186         * @param allServiceResponse a Map between the auxiliary service names and
187         *                           their outputs
188         */
189        void onContainerStarted(ContainerId containerId,
190            Map<String, ByteBuffer> allServiceResponse);
191    
192        /**
193         * The API is called when <code>NodeManager</code> responds with the status
194         * of the container
195         * @param containerId the Id of the container
196         * @param containerStatus the status of the container
197         */
198        void onContainerStatusReceived(ContainerId containerId,
199            ContainerStatus containerStatus);
200    
201        /**
202         * The API is called when <code>NodeManager</code> responds to indicate the
203         * container is stopped.
204         * @param containerId the Id of the container
205         */
206        void onContainerStopped(ContainerId containerId);
207    
208        /**
209         * The API is called when an exception is raised in the process of
210         * starting a container
211         *
212         * @param containerId the Id of the container
213         * @param t the raised exception
214         */
215        void onStartContainerError(ContainerId containerId, Throwable t);
216    
217        /**
218         * The API is called when an exception is raised in the process of
219         * querying the status of a container
220         *
221         * @param containerId the Id of the container
222         * @param t the raised exception
223         */
224        void onGetContainerStatusError(ContainerId containerId, Throwable t);
225    
226        /**
227         * The API is called when an exception is raised in the process of
228         * stopping a container
229         *
230         * @param containerId the Id of the container
231         * @param t the raised exception
232         */
233        void onStopContainerError(ContainerId containerId, Throwable t);
234    
235      }
236    
237    }