001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.client.api.async; 020 021import java.nio.ByteBuffer; 022import java.util.Map; 023import java.util.concurrent.ConcurrentMap; 024 025import org.apache.hadoop.classification.InterfaceAudience.Private; 026import org.apache.hadoop.classification.InterfaceAudience.Public; 027import org.apache.hadoop.classification.InterfaceStability.Stable; 028import org.apache.hadoop.service.AbstractService; 029import org.apache.hadoop.yarn.api.records.Container; 030import org.apache.hadoop.yarn.api.records.ContainerId; 031import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 032import org.apache.hadoop.yarn.api.records.ContainerStatus; 033import org.apache.hadoop.yarn.api.records.NodeId; 034import org.apache.hadoop.yarn.api.records.Resource; 035import org.apache.hadoop.yarn.api.records.Token; 036import org.apache.hadoop.yarn.client.api.NMClient; 037import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; 038import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; 039import org.apache.hadoop.yarn.conf.YarnConfiguration; 040 041import com.google.common.annotations.VisibleForTesting; 042 043/** 044 * <code>NMClientAsync</code> handles communication with all the NodeManagers 045 * and provides asynchronous updates on getting responses from them. It 046 * maintains a thread pool to communicate with individual NMs where a number of 047 * worker threads process requests to NMs by using {@link NMClientImpl}. The max 048 * size of the thread pool is configurable through 049 * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}. 050 * 051 * It should be used in conjunction with a CallbackHandler. For example 052 * 053 * <pre> 054 * {@code 055 * class MyCallbackHandler extends NMClientAsync.AbstractCallbackHandler { 056 * public void onContainerStarted(ContainerId containerId, 057 * Map<String, ByteBuffer> allServiceResponse) { 058 * [post process after the container is started, process the response] 059 * } 060 061 * public void onContainerResourceIncreased(ContainerId containerId, 062 * Resource resource) { 063 * [post process after the container resource is increased] 064 * } 065 * 066 * public void onContainerStatusReceived(ContainerId containerId, 067 * ContainerStatus containerStatus) { 068 * [make use of the status of the container] 069 * } 070 * 071 * public void onContainerStopped(ContainerId containerId) { 072 * [post process after the container is stopped] 073 * } 074 * 075 * public void onStartContainerError( 076 * ContainerId containerId, Throwable t) { 077 * [handle the raised exception] 078 * } 079 * 080 * public void onGetContainerStatusError( 081 * ContainerId containerId, Throwable t) { 082 * [handle the raised exception] 083 * } 084 * 085 * public void onStopContainerError( 086 * ContainerId containerId, Throwable t) { 087 * [handle the raised exception] 088 * } 089 * } 090 * } 091 * </pre> 092 * 093 * The client's life-cycle should be managed like the following: 094 * 095 * <pre> 096 * {@code 097 * NMClientAsync asyncClient = 098 * NMClientAsync.createNMClientAsync(new MyCallbackhandler()); 099 * asyncClient.init(conf); 100 * asyncClient.start(); 101 * asyncClient.startContainer(container, containerLaunchContext); 102 * [... wait for container being started] 103 * asyncClient.getContainerStatus(container.getId(), container.getNodeId(), 104 * container.getContainerToken()); 105 * [... handle the status in the callback instance] 106 * asyncClient.stopContainer(container.getId(), container.getNodeId(), 107 * container.getContainerToken()); 108 * [... wait for container being stopped] 109 * asyncClient.stop(); 110 * } 111 * </pre> 112 */ 113@Public 114@Stable 115public abstract class NMClientAsync extends AbstractService { 116 117 protected NMClient client; 118 protected CallbackHandler callbackHandler; 119 120 public static NMClientAsync createNMClientAsync( 121 AbstractCallbackHandler callbackHandler) { 122 return new NMClientAsyncImpl(callbackHandler); 123 } 124 125 protected NMClientAsync(AbstractCallbackHandler callbackHandler) { 126 this (NMClientAsync.class.getName(), callbackHandler); 127 } 128 129 protected NMClientAsync( 130 String name, AbstractCallbackHandler callbackHandler) { 131 this (name, new NMClientImpl(), callbackHandler); 132 } 133 134 protected NMClientAsync(String name, NMClient client, 135 AbstractCallbackHandler callbackHandler) { 136 super(name); 137 this.setClient(client); 138 this.setCallbackHandler(callbackHandler); 139 } 140 141 /** 142 * @deprecated Use {@link #createNMClientAsync(AbstractCallbackHandler)} 143 * instead. 144 */ 145 @Deprecated 146 public static NMClientAsync createNMClientAsync( 147 CallbackHandler callbackHandler) { 148 return new NMClientAsyncImpl(callbackHandler); 149 } 150 151 /** 152 * @deprecated Use {@link #NMClientAsync(AbstractCallbackHandler)} 153 * instead. 154 */ 155 @Deprecated 156 protected NMClientAsync(CallbackHandler callbackHandler) { 157 this (NMClientAsync.class.getName(), callbackHandler); 158 } 159 160 /** 161 * @deprecated Use {@link #NMClientAsync(String, AbstractCallbackHandler)} 162 * instead. 163 */ 164 @Deprecated 165 protected NMClientAsync(String name, CallbackHandler callbackHandler) { 166 this (name, new NMClientImpl(), callbackHandler); 167 } 168 169 @Private 170 @VisibleForTesting 171 @Deprecated 172 protected NMClientAsync(String name, NMClient client, 173 CallbackHandler callbackHandler) { 174 super(name); 175 this.setClient(client); 176 this.setCallbackHandler(callbackHandler); 177 } 178 179 public abstract void startContainerAsync( 180 Container container, ContainerLaunchContext containerLaunchContext); 181 182 public abstract void increaseContainerResourceAsync(Container container); 183 184 public abstract void stopContainerAsync( 185 ContainerId containerId, NodeId nodeId); 186 187 public abstract void getContainerStatusAsync( 188 ContainerId containerId, NodeId nodeId); 189 190 public NMClient getClient() { 191 return client; 192 } 193 194 public void setClient(NMClient client) { 195 this.client = client; 196 } 197 198 public CallbackHandler getCallbackHandler() { 199 return callbackHandler; 200 } 201 202 public void setCallbackHandler(CallbackHandler callbackHandler) { 203 this.callbackHandler = callbackHandler; 204 } 205 206 /** 207 * <p> 208 * The callback abstract class. The callback functions need to be implemented 209 * by {@link NMClientAsync} users. The APIs are called when responses from 210 * <code>NodeManager</code> are available. 211 * </p> 212 * 213 * <p> 214 * Once a callback happens, the users can chose to act on it in blocking or 215 * non-blocking manner. If the action on callback is done in a blocking 216 * manner, some of the threads performing requests on NodeManagers may get 217 * blocked depending on how many threads in the pool are busy. 218 * </p> 219 * 220 * <p> 221 * The implementation of the callback functions should not throw the 222 * unexpected exception. Otherwise, {@link NMClientAsync} will just 223 * catch, log and then ignore it. 224 * </p> 225 */ 226 public abstract static class AbstractCallbackHandler 227 implements CallbackHandler { 228 /** 229 * The API is called when <code>NodeManager</code> responds to indicate its 230 * acceptance of the starting container request. 231 * 232 * @param containerId the Id of the container 233 * @param allServiceResponse a Map between the auxiliary service names and 234 * their outputs 235 */ 236 public abstract void onContainerStarted(ContainerId containerId, 237 Map<String, ByteBuffer> allServiceResponse); 238 239 /** 240 * The API is called when <code>NodeManager</code> responds with the status 241 * of the container. 242 * 243 * @param containerId the Id of the container 244 * @param containerStatus the status of the container 245 */ 246 public abstract void onContainerStatusReceived(ContainerId containerId, 247 ContainerStatus containerStatus); 248 249 /** 250 * The API is called when <code>NodeManager</code> responds to indicate the 251 * container is stopped. 252 * 253 * @param containerId the Id of the container 254 */ 255 public abstract void onContainerStopped(ContainerId containerId); 256 257 /** 258 * The API is called when an exception is raised in the process of 259 * starting a container. 260 * 261 * @param containerId the Id of the container 262 * @param t the raised exception 263 */ 264 public abstract void onStartContainerError( 265 ContainerId containerId, Throwable t); 266 267 /** 268 * The API is called when <code>NodeManager</code> responds to indicate 269 * the container resource has been successfully increased. 270 * 271 * @param containerId the Id of the container 272 * @param resource the target resource of the container 273 */ 274 public abstract void onContainerResourceIncreased( 275 ContainerId containerId, Resource resource); 276 277 /** 278 * The API is called when an exception is raised in the process of 279 * querying the status of a container. 280 * 281 * @param containerId the Id of the container 282 * @param t the raised exception 283 */ 284 public abstract void onGetContainerStatusError( 285 ContainerId containerId, Throwable t); 286 287 /** 288 * The API is called when an exception is raised in the process of 289 * increasing container resource. 290 * 291 * @param containerId the Id of the container 292 * @param t the raised exception 293 */ 294 public abstract void onIncreaseContainerResourceError( 295 ContainerId containerId, Throwable t); 296 297 /** 298 * The API is called when an exception is raised in the process of 299 * stopping a container. 300 * 301 * @param containerId the Id of the container 302 * @param t the raised exception 303 */ 304 public abstract void onStopContainerError( 305 ContainerId containerId, Throwable t); 306 } 307 308 /** 309 * @deprecated Use {@link NMClientAsync.AbstractCallbackHandler} instead. 310 * 311 * <p> 312 * The callback interface needs to be implemented by {@link NMClientAsync} 313 * users. The APIs are called when responses from <code>NodeManager</code> are 314 * available. 315 * </p> 316 * 317 * <p> 318 * Once a callback happens, the users can chose to act on it in blocking or 319 * non-blocking manner. If the action on callback is done in a blocking 320 * manner, some of the threads performing requests on NodeManagers may get 321 * blocked depending on how many threads in the pool are busy. 322 * </p> 323 * 324 * <p> 325 * The implementation of the callback function should not throw the 326 * unexpected exception. Otherwise, {@link NMClientAsync} will just 327 * catch, log and then ignore it. 328 * </p> 329 */ 330 @Deprecated 331 public static interface CallbackHandler { 332 /** 333 * The API is called when <code>NodeManager</code> responds to indicate its 334 * acceptance of the starting container request 335 * @param containerId the Id of the container 336 * @param allServiceResponse a Map between the auxiliary service names and 337 * their outputs 338 */ 339 void onContainerStarted(ContainerId containerId, 340 Map<String, ByteBuffer> allServiceResponse); 341 342 /** 343 * The API is called when <code>NodeManager</code> responds with the status 344 * of the container 345 * @param containerId the Id of the container 346 * @param containerStatus the status of the container 347 */ 348 void onContainerStatusReceived(ContainerId containerId, 349 ContainerStatus containerStatus); 350 351 /** 352 * The API is called when <code>NodeManager</code> responds to indicate the 353 * container is stopped. 354 * @param containerId the Id of the container 355 */ 356 void onContainerStopped(ContainerId containerId); 357 358 /** 359 * The API is called when an exception is raised in the process of 360 * starting a container 361 * 362 * @param containerId the Id of the container 363 * @param t the raised exception 364 */ 365 void onStartContainerError(ContainerId containerId, Throwable t); 366 367 /** 368 * The API is called when an exception is raised in the process of 369 * querying the status of a container 370 * 371 * @param containerId the Id of the container 372 * @param t the raised exception 373 */ 374 void onGetContainerStatusError(ContainerId containerId, Throwable t); 375 376 /** 377 * The API is called when an exception is raised in the process of 378 * stopping a container 379 * 380 * @param containerId the Id of the container 381 * @param t the raised exception 382 */ 383 void onStopContainerError(ContainerId containerId, Throwable t); 384 385 } 386 387}