001/** 002* Licensed to the Apache Software Foundation (ASF) under one 003* or more contributor license agreements. See the NOTICE file 004* distributed with this work for additional information 005* regarding copyright ownership. The ASF licenses this file 006* to you under the Apache License, Version 2.0 (the 007* "License"); you may not use this file except in compliance 008* with the License. You may obtain a copy of the License at 009* 010* http://www.apache.org/licenses/LICENSE-2.0 011* 012* Unless required by applicable law or agreed to in writing, software 013* distributed under the License is distributed on an "AS IS" BASIS, 014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015* See the License for the specific language governing permissions and 016* limitations under the License. 017*/ 018 019package org.apache.hadoop.yarn.client.api.async; 020 021import com.google.common.base.Preconditions; 022import com.google.common.base.Supplier; 023import java.io.IOException; 024import java.util.Collection; 025import java.util.List; 026import java.util.concurrent.atomic.AtomicInteger; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.apache.hadoop.classification.InterfaceAudience.Private; 031import org.apache.hadoop.classification.InterfaceAudience.Public; 032import org.apache.hadoop.classification.InterfaceStability.Stable; 033import org.apache.hadoop.service.AbstractService; 034import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 035import org.apache.hadoop.yarn.api.records.Container; 036import org.apache.hadoop.yarn.api.records.ContainerId; 037import org.apache.hadoop.yarn.api.records.ContainerStatus; 038import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 039import org.apache.hadoop.yarn.api.records.NodeReport; 040import org.apache.hadoop.yarn.api.records.Priority; 041import org.apache.hadoop.yarn.api.records.Resource; 042import org.apache.hadoop.yarn.client.api.AMRMClient; 043import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 044import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; 045import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; 046import org.apache.hadoop.yarn.exceptions.YarnException; 047 048import com.google.common.annotations.VisibleForTesting; 049 050/** 051 * <code>AMRMClientAsync</code> handles communication with the ResourceManager 052 * and provides asynchronous updates on events such as container allocations and 053 * completions. It contains a thread that sends periodic heartbeats to the 054 * ResourceManager. 055 * 056 * It should be used by implementing a CallbackHandler: 057 * <pre> 058 * {@code 059 * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler { 060 * public void onContainersAllocated(List<Container> containers) { 061 * [run tasks on the containers] 062 * } 063 * 064 * public void onContainersCompleted(List<ContainerStatus> statuses) { 065 * [update progress, check whether app is done] 066 * } 067 * 068 * public void onNodesUpdated(List<NodeReport> updated) {} 069 * 070 * public void onReboot() {} 071 * } 072 * } 073 * </pre> 074 * 075 * The client's lifecycle should be managed similarly to the following: 076 * 077 * <pre> 078 * {@code 079 * AMRMClientAsync asyncClient = 080 * createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler()); 081 * asyncClient.init(conf); 082 * asyncClient.start(); 083 * RegisterApplicationMasterResponse response = asyncClient 084 * .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 085 * appMasterTrackingUrl); 086 * asyncClient.addContainerRequest(containerRequest); 087 * [... wait for application to complete] 088 * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl); 089 * asyncClient.stop(); 090 * } 091 * </pre> 092 */ 093@Public 094@Stable 095public abstract class AMRMClientAsync<T extends ContainerRequest> 096extends AbstractService { 097 private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class); 098 099 protected final AMRMClient<T> client; 100 protected final CallbackHandler handler; 101 protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger(); 102 103 public static <T extends ContainerRequest> AMRMClientAsync<T> 104 createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) { 105 return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler); 106 } 107 108 public static <T extends ContainerRequest> AMRMClientAsync<T> 109 createAMRMClientAsync(AMRMClient<T> client, int intervalMs, 110 CallbackHandler callbackHandler) { 111 return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler); 112 } 113 114 protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) { 115 this(new AMRMClientImpl<T>(), intervalMs, callbackHandler); 116 } 117 118 @Private 119 @VisibleForTesting 120 protected AMRMClientAsync(AMRMClient<T> client, int intervalMs, 121 CallbackHandler callbackHandler) { 122 super(AMRMClientAsync.class.getName()); 123 this.client = client; 124 this.heartbeatIntervalMs.set(intervalMs); 125 this.handler = callbackHandler; 126 } 127 128 public void setHeartbeatInterval(int interval) { 129 heartbeatIntervalMs.set(interval); 130 } 131 132 public abstract List<? extends Collection<T>> getMatchingRequests( 133 Priority priority, 134 String resourceName, 135 Resource capability); 136 137 /** 138 * Registers this application master with the resource manager. On successful 139 * registration, starts the heartbeating thread. 140 * @throws YarnException 141 * @throws IOException 142 */ 143 public abstract RegisterApplicationMasterResponse registerApplicationMaster( 144 String appHostName, int appHostPort, String appTrackingUrl) 145 throws YarnException, IOException; 146 147 /** 148 * Unregister the application master. This must be called in the end. 149 * @param appStatus Success/Failure status of the master 150 * @param appMessage Diagnostics message on failure 151 * @param appTrackingUrl New URL to get master info 152 * @throws YarnException 153 * @throws IOException 154 */ 155 public abstract void unregisterApplicationMaster( 156 FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) 157 throws YarnException, IOException; 158 159 /** 160 * Request containers for resources before calling <code>allocate</code> 161 * @param req Resource request 162 */ 163 public abstract void addContainerRequest(T req); 164 165 /** 166 * Remove previous container request. The previous container request may have 167 * already been sent to the ResourceManager. So even after the remove request 168 * the app must be prepared to receive an allocation for the previous request 169 * even after the remove request 170 * @param req Resource request 171 */ 172 public abstract void removeContainerRequest(T req); 173 174 /** 175 * Release containers assigned by the Resource Manager. If the app cannot use 176 * the container or wants to give up the container then it can release them. 177 * The app needs to make new requests for the released resource capability if 178 * it still needs it. eg. it released non-local resources 179 * @param containerId 180 */ 181 public abstract void releaseAssignedContainer(ContainerId containerId); 182 183 /** 184 * Get the currently available resources in the cluster. 185 * A valid value is available after a call to allocate has been made 186 * @return Currently available resources 187 */ 188 public abstract Resource getAvailableResources(); 189 190 /** 191 * Get the current number of nodes in the cluster. 192 * A valid values is available after a call to allocate has been made 193 * @return Current number of nodes in the cluster 194 */ 195 public abstract int getClusterNodeCount(); 196 197 /** 198 * Update application's blacklist with addition or removal resources. 199 * 200 * @param blacklistAdditions list of resources which should be added to the 201 * application blacklist 202 * @param blacklistRemovals list of resources which should be removed from the 203 * application blacklist 204 */ 205 public abstract void updateBlacklist(List<String> blacklistAdditions, 206 List<String> blacklistRemovals); 207 208 /** 209 * Wait for <code>check</code> to return true for each 1000 ms. 210 * See also {@link #waitFor(com.google.common.base.Supplier, int)} 211 * and {@link #waitFor(com.google.common.base.Supplier, int, int)} 212 * @param check 213 */ 214 public void waitFor(Supplier<Boolean> check) throws InterruptedException { 215 waitFor(check, 1000); 216 } 217 218 /** 219 * Wait for <code>check</code> to return true for each 220 * <code>checkEveryMillis</code> ms. 221 * See also {@link #waitFor(com.google.common.base.Supplier, int, int)} 222 * @param check user defined checker 223 * @param checkEveryMillis interval to call <code>check</code> 224 */ 225 public void waitFor(Supplier<Boolean> check, int checkEveryMillis) 226 throws InterruptedException { 227 waitFor(check, checkEveryMillis, 1); 228 }; 229 230 /** 231 * Wait for <code>check</code> to return true for each 232 * <code>checkEveryMillis</code> ms. In the main loop, this method will log 233 * the message "waiting in main loop" for each <code>logInterval</code> times 234 * iteration to confirm the thread is alive. 235 * @param check user defined checker 236 * @param checkEveryMillis interval to call <code>check</code> 237 * @param logInterval interval to log for each 238 */ 239 public void waitFor(Supplier<Boolean> check, int checkEveryMillis, 240 int logInterval) throws InterruptedException { 241 Preconditions.checkNotNull(check, "check should not be null"); 242 Preconditions.checkArgument(checkEveryMillis >= 0, 243 "checkEveryMillis should be positive value"); 244 Preconditions.checkArgument(logInterval >= 0, 245 "logInterval should be positive value"); 246 247 int loggingCounter = logInterval; 248 do { 249 if (LOG.isDebugEnabled()) { 250 LOG.debug("Check the condition for main loop."); 251 } 252 253 boolean result = check.get(); 254 if (result) { 255 LOG.info("Exits the main loop."); 256 return; 257 } 258 if (--loggingCounter <= 0) { 259 LOG.info("Waiting in main loop."); 260 loggingCounter = logInterval; 261 } 262 263 Thread.sleep(checkEveryMillis); 264 } while (true); 265 } 266 267 public interface CallbackHandler { 268 269 /** 270 * Called when the ResourceManager responds to a heartbeat with completed 271 * containers. If the response contains both completed containers and 272 * allocated containers, this will be called before containersAllocated. 273 */ 274 public void onContainersCompleted(List<ContainerStatus> statuses); 275 276 /** 277 * Called when the ResourceManager responds to a heartbeat with allocated 278 * containers. If the response containers both completed containers and 279 * allocated containers, this will be called after containersCompleted. 280 */ 281 public void onContainersAllocated(List<Container> containers); 282 283 /** 284 * Called when the ResourceManager wants the ApplicationMaster to shutdown 285 * for being out of sync etc. The ApplicationMaster should not unregister 286 * with the RM unless the ApplicationMaster wants to be the last attempt. 287 */ 288 public void onShutdownRequest(); 289 290 /** 291 * Called when nodes tracked by the ResourceManager have changed in health, 292 * availability etc. 293 */ 294 public void onNodesUpdated(List<NodeReport> updatedNodes); 295 296 public float getProgress(); 297 298 /** 299 * Called when error comes from RM communications as well as from errors in 300 * the callback itself from the app. Calling 301 * stop() is the recommended action. 302 * 303 * @param e 304 */ 305 public void onError(Throwable e); 306 } 307}