001/** 002* Licensed to the Apache Software Foundation (ASF) under one 003* or more contributor license agreements. See the NOTICE file 004* distributed with this work for additional information 005* regarding copyright ownership. The ASF licenses this file 006* to you under the Apache License, Version 2.0 (the 007* "License"); you may not use this file except in compliance 008* with the License. You may obtain a copy of the License at 009* 010* http://www.apache.org/licenses/LICENSE-2.0 011* 012* Unless required by applicable law or agreed to in writing, software 013* distributed under the License is distributed on an "AS IS" BASIS, 014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015* See the License for the specific language governing permissions and 016* limitations under the License. 017*/ 018 019package org.apache.hadoop.yarn.client.api.async; 020 021import com.google.common.base.Preconditions; 022import com.google.common.base.Supplier; 023import java.io.IOException; 024import java.util.Collection; 025import java.util.List; 026import java.util.concurrent.atomic.AtomicInteger; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.apache.hadoop.classification.InterfaceAudience.Private; 031import org.apache.hadoop.classification.InterfaceAudience.Public; 032import org.apache.hadoop.classification.InterfaceStability.Stable; 033import org.apache.hadoop.service.AbstractService; 034import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 035import org.apache.hadoop.yarn.api.records.Container; 036import org.apache.hadoop.yarn.api.records.ContainerId; 037import org.apache.hadoop.yarn.api.records.ContainerStatus; 038import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 039import org.apache.hadoop.yarn.api.records.NodeReport; 040import org.apache.hadoop.yarn.api.records.Priority; 041import org.apache.hadoop.yarn.api.records.Resource; 042import org.apache.hadoop.yarn.client.api.AMRMClient; 043import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 044import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; 045import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; 046import org.apache.hadoop.yarn.exceptions.YarnException; 047 048import com.google.common.annotations.VisibleForTesting; 049 050/** 051 * <code>AMRMClientAsync</code> handles communication with the ResourceManager 052 * and provides asynchronous updates on events such as container allocations and 053 * completions. It contains a thread that sends periodic heartbeats to the 054 * ResourceManager. 055 * 056 * It should be used by implementing a CallbackHandler: 057 * <pre> 058 * {@code 059 * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler { 060 * public void onContainersAllocated(List<Container> containers) { 061 * [run tasks on the containers] 062 * } 063 * 064 * public void onContainersCompleted(List<ContainerStatus> statuses) { 065 * [update progress, check whether app is done] 066 * } 067 * 068 * public void onNodesUpdated(List<NodeReport> updated) {} 069 * 070 * public void onReboot() {} 071 * } 072 * } 073 * </pre> 074 * 075 * The client's lifecycle should be managed similarly to the following: 076 * 077 * <pre> 078 * {@code 079 * AMRMClientAsync asyncClient = 080 * createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler()); 081 * asyncClient.init(conf); 082 * asyncClient.start(); 083 * RegisterApplicationMasterResponse response = asyncClient 084 * .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 085 * appMasterTrackingUrl); 086 * asyncClient.addContainerRequest(containerRequest); 087 * [... wait for application to complete] 088 * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl); 089 * asyncClient.stop(); 090 * } 091 * </pre> 092 */ 093@Public 094@Stable 095public abstract class AMRMClientAsync<T extends ContainerRequest> 096extends AbstractService { 097 private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class); 098 099 protected final AMRMClient<T> client; 100 protected final CallbackHandler handler; 101 protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger(); 102 103 public static <T extends ContainerRequest> AMRMClientAsync<T> 104 createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) { 105 return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler); 106 } 107 108 public static <T extends ContainerRequest> AMRMClientAsync<T> 109 createAMRMClientAsync(AMRMClient<T> client, int intervalMs, 110 CallbackHandler callbackHandler) { 111 return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler); 112 } 113 114 protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) { 115 this(new AMRMClientImpl<T>(), intervalMs, callbackHandler); 116 } 117 118 @Private 119 @VisibleForTesting 120 protected AMRMClientAsync(AMRMClient<T> client, int intervalMs, 121 CallbackHandler callbackHandler) { 122 super(AMRMClientAsync.class.getName()); 123 this.client = client; 124 this.heartbeatIntervalMs.set(intervalMs); 125 this.handler = callbackHandler; 126 } 127 128 public void setHeartbeatInterval(int interval) { 129 heartbeatIntervalMs.set(interval); 130 } 131 132 public abstract List<? extends Collection<T>> getMatchingRequests( 133 Priority priority, 134 String resourceName, 135 Resource capability); 136 137 /** 138 * Registers this application master with the resource manager. On successful 139 * registration, starts the heartbeating thread. 140 * @throws YarnException 141 * @throws IOException 142 */ 143 public abstract RegisterApplicationMasterResponse registerApplicationMaster( 144 String appHostName, int appHostPort, String appTrackingUrl) 145 throws YarnException, IOException; 146 147 /** 148 * Unregister the application master. This must be called in the end. 149 * @param appStatus Success/Failure status of the master 150 * @param appMessage Diagnostics message on failure 151 * @param appTrackingUrl New URL to get master info 152 * @throws YarnException 153 * @throws IOException 154 */ 155 public abstract void unregisterApplicationMaster( 156 FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) 157 throws YarnException, IOException; 158 159 /** 160 * Request containers for resources before calling <code>allocate</code> 161 * @param req Resource request 162 */ 163 public abstract void addContainerRequest(T req); 164 165 /** 166 * Remove previous container request. The previous container request may have 167 * already been sent to the ResourceManager. So even after the remove request 168 * the app must be prepared to receive an allocation for the previous request 169 * even after the remove request 170 * @param req Resource request 171 */ 172 public abstract void removeContainerRequest(T req); 173 174 /** 175 * Release containers assigned by the Resource Manager. If the app cannot use 176 * the container or wants to give up the container then it can release them. 177 * The app needs to make new requests for the released resource capability if 178 * it still needs it. eg. it released non-local resources 179 * @param containerId 180 */ 181 public abstract void releaseAssignedContainer(ContainerId containerId); 182 183 /** 184 * Get the currently available resources in the cluster. 185 * A valid value is available after a call to allocate has been made 186 * @return Currently available resources 187 */ 188 public abstract Resource getAvailableResources(); 189 190 /** 191 * Get the current number of nodes in the cluster. 192 * A valid values is available after a call to allocate has been made 193 * @return Current number of nodes in the cluster 194 */ 195 public abstract int getClusterNodeCount(); 196 197 /** 198 * Wait for <code>check</code> to return true for each 1000 ms. 199 * See also {@link #waitFor(com.google.common.base.Supplier, int)} 200 * and {@link #waitFor(com.google.common.base.Supplier, int, int)} 201 * @param check 202 */ 203 public void waitFor(Supplier<Boolean> check) throws InterruptedException { 204 waitFor(check, 1000); 205 } 206 207 /** 208 * Wait for <code>check</code> to return true for each 209 * <code>checkEveryMillis</code> ms. 210 * See also {@link #waitFor(com.google.common.base.Supplier, int, int)} 211 * @param check user defined checker 212 * @param checkEveryMillis interval to call <code>check</code> 213 */ 214 public void waitFor(Supplier<Boolean> check, int checkEveryMillis) 215 throws InterruptedException { 216 waitFor(check, checkEveryMillis, 1); 217 }; 218 219 /** 220 * Wait for <code>check</code> to return true for each 221 * <code>checkEveryMillis</code> ms. In the main loop, this method will log 222 * the message "waiting in main loop" for each <code>logInterval</code> times 223 * iteration to confirm the thread is alive. 224 * @param check user defined checker 225 * @param checkEveryMillis interval to call <code>check</code> 226 * @param logInterval interval to log for each 227 */ 228 public void waitFor(Supplier<Boolean> check, int checkEveryMillis, 229 int logInterval) throws InterruptedException { 230 Preconditions.checkNotNull(check, "check should not be null"); 231 Preconditions.checkArgument(checkEveryMillis >= 0, 232 "checkEveryMillis should be positive value"); 233 Preconditions.checkArgument(logInterval >= 0, 234 "logInterval should be positive value"); 235 236 int loggingCounter = logInterval; 237 do { 238 if (LOG.isDebugEnabled()) { 239 LOG.debug("Check the condition for main loop."); 240 } 241 242 boolean result = check.get(); 243 if (result) { 244 LOG.info("Exits the main loop."); 245 return; 246 } 247 if (--loggingCounter <= 0) { 248 LOG.info("Waiting in main loop."); 249 loggingCounter = logInterval; 250 } 251 252 Thread.sleep(checkEveryMillis); 253 } while (true); 254 } 255 256 public interface CallbackHandler { 257 258 /** 259 * Called when the ResourceManager responds to a heartbeat with completed 260 * containers. If the response contains both completed containers and 261 * allocated containers, this will be called before containersAllocated. 262 */ 263 public void onContainersCompleted(List<ContainerStatus> statuses); 264 265 /** 266 * Called when the ResourceManager responds to a heartbeat with allocated 267 * containers. If the response containers both completed containers and 268 * allocated containers, this will be called after containersCompleted. 269 */ 270 public void onContainersAllocated(List<Container> containers); 271 272 /** 273 * Called when the ResourceManager wants the ApplicationMaster to shutdown 274 * for being out of sync etc. The ApplicationMaster should not unregister 275 * with the RM unless the ApplicationMaster wants to be the last attempt. 276 */ 277 public void onShutdownRequest(); 278 279 /** 280 * Called when nodes tracked by the ResourceManager have changed in health, 281 * availability etc. 282 */ 283 public void onNodesUpdated(List<NodeReport> updatedNodes); 284 285 public float getProgress(); 286 287 /** 288 * Called when error comes from RM communications as well as from errors in 289 * the callback itself from the app. Calling 290 * stop() is the recommended action. 291 * 292 * @param e 293 */ 294 public void onError(Throwable e); 295 } 296}