001/** 002* Licensed to the Apache Software Foundation (ASF) under one 003* or more contributor license agreements. See the NOTICE file 004* distributed with this work for additional information 005* regarding copyright ownership. The ASF licenses this file 006* to you under the Apache License, Version 2.0 (the 007* "License"); you may not use this file except in compliance 008* with the License. You may obtain a copy of the License at 009* 010* http://www.apache.org/licenses/LICENSE-2.0 011* 012* Unless required by applicable law or agreed to in writing, software 013* distributed under the License is distributed on an "AS IS" BASIS, 014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015* See the License for the specific language governing permissions and 016* limitations under the License. 017*/ 018 019package org.apache.hadoop.yarn.client.api.async; 020 021import com.google.common.base.Preconditions; 022import com.google.common.base.Supplier; 023import java.io.IOException; 024import java.util.Collection; 025import java.util.List; 026import java.util.concurrent.atomic.AtomicInteger; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.apache.hadoop.classification.InterfaceAudience.Private; 031import org.apache.hadoop.classification.InterfaceAudience.Public; 032import org.apache.hadoop.classification.InterfaceStability.Unstable; 033import org.apache.hadoop.classification.InterfaceStability.Stable; 034import org.apache.hadoop.service.AbstractService; 035import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 036import org.apache.hadoop.yarn.api.records.Container; 037import org.apache.hadoop.yarn.api.records.ContainerId; 038import org.apache.hadoop.yarn.api.records.ContainerStatus; 039import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 040import org.apache.hadoop.yarn.api.records.NodeReport; 041import org.apache.hadoop.yarn.api.records.Priority; 042import org.apache.hadoop.yarn.api.records.Resource; 043import org.apache.hadoop.yarn.api.records.UpdatedContainer; 044import org.apache.hadoop.yarn.client.api.AMRMClient; 045import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 046import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; 047import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; 048import org.apache.hadoop.yarn.exceptions.YarnException; 049 050import com.google.common.annotations.VisibleForTesting; 051 052/** 053 * <code>AMRMClientAsync</code> handles communication with the ResourceManager 054 * and provides asynchronous updates on events such as container allocations and 055 * completions. It contains a thread that sends periodic heartbeats to the 056 * ResourceManager. 057 * 058 * It should be used by implementing a CallbackHandler: 059 * <pre> 060 * {@code 061 * class MyCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler { 062 * public void onContainersAllocated(List<Container> containers) { 063 * [run tasks on the containers] 064 * } 065 * 066 * public void onContainersUpdated(List<Container> containers) { 067 * [determine if resource allocation of containers have been increased in 068 * the ResourceManager, and if so, inform the NodeManagers to increase the 069 * resource monitor/enforcement on the containers] 070 * } 071 * 072 * public void onContainersCompleted(List<ContainerStatus> statuses) { 073 * [update progress, check whether app is done] 074 * } 075 * 076 * public void onNodesUpdated(List<NodeReport> updated) {} 077 * 078 * public void onReboot() {} 079 * } 080 * } 081 * </pre> 082 * 083 * The client's lifecycle should be managed similarly to the following: 084 * 085 * <pre> 086 * {@code 087 * AMRMClientAsync asyncClient = 088 * createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler()); 089 * asyncClient.init(conf); 090 * asyncClient.start(); 091 * RegisterApplicationMasterResponse response = asyncClient 092 * .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 093 * appMasterTrackingUrl); 094 * asyncClient.addContainerRequest(containerRequest); 095 * [... wait for application to complete] 096 * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl); 097 * asyncClient.stop(); 098 * } 099 * </pre> 100 */ 101@Public 102@Stable 103public abstract class AMRMClientAsync<T extends ContainerRequest> 104extends AbstractService { 105 private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class); 106 107 protected final AMRMClient<T> client; 108 protected final CallbackHandler handler; 109 protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger(); 110 111 /** 112 * <p>Create a new instance of AMRMClientAsync.</p> 113 * 114 * @param intervalMs heartbeat interval in milliseconds between AM and RM 115 * @param callbackHandler callback handler that processes responses from 116 * the <code>ResourceManager</code> 117 */ 118 public static <T extends ContainerRequest> AMRMClientAsync<T> 119 createAMRMClientAsync( 120 int intervalMs, AbstractCallbackHandler callbackHandler) { 121 return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler); 122 } 123 124 /** 125 * <p>Create a new instance of AMRMClientAsync.</p> 126 * 127 * @param client the AMRMClient instance 128 * @param intervalMs heartbeat interval in milliseconds between AM and RM 129 * @param callbackHandler callback handler that processes responses from 130 * the <code>ResourceManager</code> 131 */ 132 public static <T extends ContainerRequest> AMRMClientAsync<T> 133 createAMRMClientAsync( 134 AMRMClient<T> client, int intervalMs, 135 AbstractCallbackHandler callbackHandler) { 136 return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler); 137 } 138 139 protected AMRMClientAsync( 140 int intervalMs, AbstractCallbackHandler callbackHandler) { 141 this(new AMRMClientImpl<T>(), intervalMs, callbackHandler); 142 } 143 144 @Private 145 @VisibleForTesting 146 protected AMRMClientAsync(AMRMClient<T> client, int intervalMs, 147 AbstractCallbackHandler callbackHandler) { 148 super(AMRMClientAsync.class.getName()); 149 this.client = client; 150 this.heartbeatIntervalMs.set(intervalMs); 151 this.handler = callbackHandler; 152 } 153 154 /** 155 * 156 * @deprecated Use {@link #createAMRMClientAsync(int, 157 * AMRMClientAsync.AbstractCallbackHandler)} instead. 158 */ 159 @Deprecated 160 public static <T extends ContainerRequest> AMRMClientAsync<T> 161 createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) { 162 return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler); 163 } 164 165 /** 166 * 167 * @deprecated Use {@link #createAMRMClientAsync(AMRMClient, 168 * int, AMRMClientAsync.AbstractCallbackHandler)} instead. 169 */ 170 @Deprecated 171 public static <T extends ContainerRequest> AMRMClientAsync<T> 172 createAMRMClientAsync(AMRMClient<T> client, int intervalMs, 173 CallbackHandler callbackHandler) { 174 return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler); 175 } 176 177 @Deprecated 178 protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) { 179 this(new AMRMClientImpl<T>(), intervalMs, callbackHandler); 180 } 181 182 @Private 183 @VisibleForTesting 184 @Deprecated 185 protected AMRMClientAsync(AMRMClient<T> client, int intervalMs, 186 CallbackHandler callbackHandler) { 187 super(AMRMClientAsync.class.getName()); 188 this.client = client; 189 this.heartbeatIntervalMs.set(intervalMs); 190 this.handler = callbackHandler; 191 } 192 193 public void setHeartbeatInterval(int interval) { 194 heartbeatIntervalMs.set(interval); 195 } 196 197 public abstract List<? extends Collection<T>> getMatchingRequests( 198 Priority priority, 199 String resourceName, 200 Resource capability); 201 202 /** 203 * Registers this application master with the resource manager. On successful 204 * registration, starts the heartbeating thread. 205 * @throws YarnException 206 * @throws IOException 207 */ 208 public abstract RegisterApplicationMasterResponse registerApplicationMaster( 209 String appHostName, int appHostPort, String appTrackingUrl) 210 throws YarnException, IOException; 211 212 /** 213 * Unregister the application master. This must be called in the end. 214 * @param appStatus Success/Failure status of the master 215 * @param appMessage Diagnostics message on failure 216 * @param appTrackingUrl New URL to get master info 217 * @throws YarnException 218 * @throws IOException 219 */ 220 public abstract void unregisterApplicationMaster( 221 FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) 222 throws YarnException, IOException; 223 224 /** 225 * Request containers for resources before calling <code>allocate</code> 226 * @param req Resource request 227 */ 228 public abstract void addContainerRequest(T req); 229 230 /** 231 * Remove previous container request. The previous container request may have 232 * already been sent to the ResourceManager. So even after the remove request 233 * the app must be prepared to receive an allocation for the previous request 234 * even after the remove request 235 * @param req Resource request 236 */ 237 public abstract void removeContainerRequest(T req); 238 239 /** 240 * Request container resource change before calling <code>allocate</code>. 241 * Any previous pending resource change request of the same container will be 242 * removed. 243 * 244 * Application that calls this method is expected to maintain the 245 * <code>Container</code>s that are returned from previous successful 246 * allocations or resource changes. By passing in the existing container and a 247 * target resource capability to this method, the application requests the 248 * ResourceManager to change the existing resource allocation to the target 249 * resource allocation. 250 * 251 * @param container The container returned from the last successful resource 252 * allocation or resource change 253 * @param capability The target resource capability of the container 254 */ 255 public abstract void requestContainerResourceChange( 256 Container container, Resource capability); 257 258 /** 259 * Release containers assigned by the Resource Manager. If the app cannot use 260 * the container or wants to give up the container then it can release them. 261 * The app needs to make new requests for the released resource capability if 262 * it still needs it. eg. it released non-local resources 263 * @param containerId 264 */ 265 public abstract void releaseAssignedContainer(ContainerId containerId); 266 267 /** 268 * Get the currently available resources in the cluster. 269 * A valid value is available after a call to allocate has been made 270 * @return Currently available resources 271 */ 272 public abstract Resource getAvailableResources(); 273 274 /** 275 * Get the current number of nodes in the cluster. 276 * A valid values is available after a call to allocate has been made 277 * @return Current number of nodes in the cluster 278 */ 279 public abstract int getClusterNodeCount(); 280 281 /** 282 * Update application's blacklist with addition or removal resources. 283 * 284 * @param blacklistAdditions list of resources which should be added to the 285 * application blacklist 286 * @param blacklistRemovals list of resources which should be removed from the 287 * application blacklist 288 */ 289 public abstract void updateBlacklist(List<String> blacklistAdditions, 290 List<String> blacklistRemovals); 291 292 /** 293 * Wait for <code>check</code> to return true for each 1000 ms. 294 * See also {@link #waitFor(com.google.common.base.Supplier, int)} 295 * and {@link #waitFor(com.google.common.base.Supplier, int, int)} 296 * @param check 297 */ 298 public void waitFor(Supplier<Boolean> check) throws InterruptedException { 299 waitFor(check, 1000); 300 } 301 302 /** 303 * Wait for <code>check</code> to return true for each 304 * <code>checkEveryMillis</code> ms. 305 * See also {@link #waitFor(com.google.common.base.Supplier, int, int)} 306 * @param check user defined checker 307 * @param checkEveryMillis interval to call <code>check</code> 308 */ 309 public void waitFor(Supplier<Boolean> check, int checkEveryMillis) 310 throws InterruptedException { 311 waitFor(check, checkEveryMillis, 1); 312 }; 313 314 /** 315 * Wait for <code>check</code> to return true for each 316 * <code>checkEveryMillis</code> ms. In the main loop, this method will log 317 * the message "waiting in main loop" for each <code>logInterval</code> times 318 * iteration to confirm the thread is alive. 319 * @param check user defined checker 320 * @param checkEveryMillis interval to call <code>check</code> 321 * @param logInterval interval to log for each 322 */ 323 public void waitFor(Supplier<Boolean> check, int checkEveryMillis, 324 int logInterval) throws InterruptedException { 325 Preconditions.checkNotNull(check, "check should not be null"); 326 Preconditions.checkArgument(checkEveryMillis >= 0, 327 "checkEveryMillis should be positive value"); 328 Preconditions.checkArgument(logInterval >= 0, 329 "logInterval should be positive value"); 330 331 int loggingCounter = logInterval; 332 do { 333 if (LOG.isDebugEnabled()) { 334 LOG.debug("Check the condition for main loop."); 335 } 336 337 boolean result = check.get(); 338 if (result) { 339 LOG.info("Exits the main loop."); 340 return; 341 } 342 if (--loggingCounter <= 0) { 343 LOG.info("Waiting in main loop."); 344 loggingCounter = logInterval; 345 } 346 347 Thread.sleep(checkEveryMillis); 348 } while (true); 349 } 350 351 /** 352 * <p> 353 * The callback abstract class. The callback functions need to be implemented 354 * by {@link AMRMClientAsync} users. The APIs are called when responses from 355 * the <code>ResourceManager</code> are available. 356 * </p> 357 */ 358 public abstract static class AbstractCallbackHandler 359 implements CallbackHandler { 360 361 /** 362 * Called when the ResourceManager responds to a heartbeat with completed 363 * containers. If the response contains both completed containers and 364 * allocated containers, this will be called before containersAllocated. 365 */ 366 public abstract void onContainersCompleted(List<ContainerStatus> statuses); 367 368 /** 369 * Called when the ResourceManager responds to a heartbeat with allocated 370 * containers. If the response containers both completed containers and 371 * allocated containers, this will be called after containersCompleted. 372 */ 373 public abstract void onContainersAllocated(List<Container> containers); 374 375 /** 376 * Called when the ResourceManager responds to a heartbeat with containers 377 * whose resource allocation has been changed. 378 */ 379 @Public 380 @Unstable 381 public abstract void onContainersUpdated(List<UpdatedContainer> containers); 382 383 /** 384 * Called when the ResourceManager wants the ApplicationMaster to shutdown 385 * for being out of sync etc. The ApplicationMaster should not unregister 386 * with the RM unless the ApplicationMaster wants to be the last attempt. 387 */ 388 public abstract void onShutdownRequest(); 389 390 /** 391 * Called when nodes tracked by the ResourceManager have changed in health, 392 * availability etc. 393 */ 394 public abstract void onNodesUpdated(List<NodeReport> updatedNodes); 395 396 public abstract float getProgress(); 397 398 /** 399 * Called when error comes from RM communications as well as from errors in 400 * the callback itself from the app. Calling 401 * stop() is the recommended action. 402 */ 403 public abstract void onError(Throwable e); 404 } 405 406 /** 407 * @deprecated Use {@link AMRMClientAsync.AbstractCallbackHandler} instead. 408 */ 409 @Deprecated 410 public interface CallbackHandler { 411 412 /** 413 * Called when the ResourceManager responds to a heartbeat with completed 414 * containers. If the response contains both completed containers and 415 * allocated containers, this will be called before containersAllocated. 416 */ 417 void onContainersCompleted(List<ContainerStatus> statuses); 418 419 /** 420 * Called when the ResourceManager responds to a heartbeat with allocated 421 * containers. If the response containers both completed containers and 422 * allocated containers, this will be called after containersCompleted. 423 */ 424 void onContainersAllocated(List<Container> containers); 425 426 /** 427 * Called when the ResourceManager wants the ApplicationMaster to shutdown 428 * for being out of sync etc. The ApplicationMaster should not unregister 429 * with the RM unless the ApplicationMaster wants to be the last attempt. 430 */ 431 void onShutdownRequest(); 432 433 /** 434 * Called when nodes tracked by the ResourceManager have changed in health, 435 * availability etc. 436 */ 437 void onNodesUpdated(List<NodeReport> updatedNodes); 438 439 float getProgress(); 440 441 /** 442 * Called when error comes from RM communications as well as from errors in 443 * the callback itself from the app. Calling 444 * stop() is the recommended action. 445 * 446 * @param e 447 */ 448 void onError(Throwable e); 449 } 450}