001/** 002* Licensed to the Apache Software Foundation (ASF) under one 003* or more contributor license agreements. See the NOTICE file 004* distributed with this work for additional information 005* regarding copyright ownership. The ASF licenses this file 006* to you under the Apache License, Version 2.0 (the 007* "License"); you may not use this file except in compliance 008* with the License. You may obtain a copy of the License at 009* 010* http://www.apache.org/licenses/LICENSE-2.0 011* 012* Unless required by applicable law or agreed to in writing, software 013* distributed under the License is distributed on an "AS IS" BASIS, 014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015* See the License for the specific language governing permissions and 016* limitations under the License. 017*/ 018 019package org.apache.hadoop.yarn.client.api; 020 021import java.io.IOException; 022import java.util.Collection; 023import java.util.List; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceAudience.Private; 029import org.apache.hadoop.classification.InterfaceAudience.Public; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.service.AbstractService; 032import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 033import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 034 035import org.apache.hadoop.yarn.api.records.Container; 036import org.apache.hadoop.yarn.api.records.ContainerId; 037import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 038import org.apache.hadoop.yarn.api.records.Priority; 039import org.apache.hadoop.yarn.api.records.Resource; 040import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; 041import org.apache.hadoop.yarn.exceptions.YarnException; 042 043import com.google.common.base.Preconditions; 044import com.google.common.base.Supplier; 045import com.google.common.collect.ImmutableList; 046 047@InterfaceAudience.Public 048@InterfaceStability.Stable 049public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends 050 AbstractService { 051 private static final Log LOG = LogFactory.getLog(AMRMClient.class); 052 053 /** 054 * Create a new instance of AMRMClient. 055 * For usage: 056 * <pre> 057 * {@code 058 * AMRMClient.<T>createAMRMClientContainerRequest() 059 * }</pre> 060 * @return the newly create AMRMClient instance. 061 */ 062 @Public 063 public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient() { 064 AMRMClient<T> client = new AMRMClientImpl<T>(); 065 return client; 066 } 067 068 private NMTokenCache nmTokenCache; 069 070 @Private 071 protected AMRMClient(String name) { 072 super(name); 073 nmTokenCache = NMTokenCache.getSingleton(); 074 } 075 076 /** 077 * Object to represent a single container request for resources. Scheduler 078 * documentation should be consulted for the specifics of how the parameters 079 * are honored. 080 * 081 * By default, YARN schedulers try to allocate containers at the requested 082 * locations but they may relax the constraints in order to expedite meeting 083 * allocations limits. They first relax the constraint to the same rack as the 084 * requested node and then to anywhere in the cluster. The relaxLocality flag 085 * may be used to disable locality relaxation and request containers at only 086 * specific locations. The following conditions apply. 087 * <ul> 088 * <li>Within a priority, all container requests must have the same value for 089 * locality relaxation. Either enabled or disabled.</li> 090 * <li>If locality relaxation is disabled, then across requests, locations at 091 * different network levels may not be specified. E.g. its invalid to make a 092 * request for a specific node and another request for a specific rack.</li> 093 * <li>If locality relaxation is disabled, then only within the same request, 094 * a node and its rack may be specified together. This allows for a specific 095 * rack with a preference for a specific node within that rack.</li> 096 * <li></li> 097 * </ul> 098 * To re-enable locality relaxation at a given priority, all pending requests 099 * with locality relaxation disabled must be first removed. Then they can be 100 * added back with locality relaxation enabled. 101 * 102 * All getters return immutable values. 103 */ 104 public static class ContainerRequest { 105 final Resource capability; 106 final List<String> nodes; 107 final List<String> racks; 108 final Priority priority; 109 final boolean relaxLocality; 110 final String nodeLabelsExpression; 111 112 /** 113 * Instantiates a {@link ContainerRequest} with the given constraints and 114 * locality relaxation enabled. 115 * 116 * @param capability 117 * The {@link Resource} to be requested for each container. 118 * @param nodes 119 * Any hosts to request that the containers are placed on. 120 * @param racks 121 * Any racks to request that the containers are placed on. The 122 * racks corresponding to any hosts requested will be automatically 123 * added to this list. 124 * @param priority 125 * The priority at which to request the containers. Higher 126 * priorities have lower numerical values. 127 */ 128 public ContainerRequest(Resource capability, String[] nodes, 129 String[] racks, Priority priority) { 130 this(capability, nodes, racks, priority, true, null); 131 } 132 133 /** 134 * Instantiates a {@link ContainerRequest} with the given constraints. 135 * 136 * @param capability 137 * The {@link Resource} to be requested for each container. 138 * @param nodes 139 * Any hosts to request that the containers are placed on. 140 * @param racks 141 * Any racks to request that the containers are placed on. The 142 * racks corresponding to any hosts requested will be automatically 143 * added to this list. 144 * @param priority 145 * The priority at which to request the containers. Higher 146 * priorities have lower numerical values. 147 * @param relaxLocality 148 * If true, containers for this request may be assigned on hosts 149 * and racks other than the ones explicitly requested. 150 */ 151 public ContainerRequest(Resource capability, String[] nodes, 152 String[] racks, Priority priority, boolean relaxLocality) { 153 this(capability, nodes, racks, priority, relaxLocality, null); 154 } 155 156 /** 157 * Instantiates a {@link ContainerRequest} with the given constraints. 158 * 159 * @param capability 160 * The {@link Resource} to be requested for each container. 161 * @param nodes 162 * Any hosts to request that the containers are placed on. 163 * @param racks 164 * Any racks to request that the containers are placed on. The 165 * racks corresponding to any hosts requested will be automatically 166 * added to this list. 167 * @param priority 168 * The priority at which to request the containers. Higher 169 * priorities have lower numerical values. 170 * @param relaxLocality 171 * If true, containers for this request may be assigned on hosts 172 * and racks other than the ones explicitly requested. 173 * @param nodeLabelsExpression 174 * Set node labels to allocate resource, now we only support 175 * asking for only a single node label 176 */ 177 public ContainerRequest(Resource capability, String[] nodes, 178 String[] racks, Priority priority, boolean relaxLocality, 179 String nodeLabelsExpression) { 180 // Validate request 181 Preconditions.checkArgument(capability != null, 182 "The Resource to be requested for each container " + 183 "should not be null "); 184 Preconditions.checkArgument(priority != null, 185 "The priority at which to request containers should not be null "); 186 Preconditions.checkArgument( 187 !(!relaxLocality && (racks == null || racks.length == 0) 188 && (nodes == null || nodes.length == 0)), 189 "Can't turn off locality relaxation on a " + 190 "request with no location constraints"); 191 this.capability = capability; 192 this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); 193 this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); 194 this.priority = priority; 195 this.relaxLocality = relaxLocality; 196 this.nodeLabelsExpression = nodeLabelsExpression; 197 } 198 199 public Resource getCapability() { 200 return capability; 201 } 202 203 public List<String> getNodes() { 204 return nodes; 205 } 206 207 public List<String> getRacks() { 208 return racks; 209 } 210 211 public Priority getPriority() { 212 return priority; 213 } 214 215 public boolean getRelaxLocality() { 216 return relaxLocality; 217 } 218 219 public String getNodeLabelExpression() { 220 return nodeLabelsExpression; 221 } 222 223 public String toString() { 224 StringBuilder sb = new StringBuilder(); 225 sb.append("Capability[").append(capability).append("]"); 226 sb.append("Priority[").append(priority).append("]"); 227 return sb.toString(); 228 } 229 } 230 231 /** 232 * Register the application master. This must be called before any 233 * other interaction 234 * @param appHostName Name of the host on which master is running 235 * @param appHostPort Port master is listening on 236 * @param appTrackingUrl URL at which the master info can be seen 237 * @return <code>RegisterApplicationMasterResponse</code> 238 * @throws YarnException 239 * @throws IOException 240 */ 241 public abstract RegisterApplicationMasterResponse 242 registerApplicationMaster(String appHostName, 243 int appHostPort, 244 String appTrackingUrl) 245 throws YarnException, IOException; 246 247 /** 248 * Request additional containers and receive new container allocations. 249 * Requests made via <code>addContainerRequest</code> are sent to the 250 * <code>ResourceManager</code>. New containers assigned to the master are 251 * retrieved. Status of completed containers and node health updates are also 252 * retrieved. This also doubles up as a heartbeat to the ResourceManager and 253 * must be made periodically. The call may not always return any new 254 * allocations of containers. App should not make concurrent allocate 255 * requests. May cause request loss. 256 * 257 * <p> 258 * Note : If the user has not removed container requests that have already 259 * been satisfied, then the re-register may end up sending the entire 260 * container requests to the RM (including matched requests). Which would mean 261 * the RM could end up giving it a lot of new allocated containers. 262 * </p> 263 * 264 * @param progressIndicator Indicates progress made by the master 265 * @return the response of the allocate request 266 * @throws YarnException 267 * @throws IOException 268 */ 269 public abstract AllocateResponse allocate(float progressIndicator) 270 throws YarnException, IOException; 271 272 /** 273 * Unregister the application master. This must be called in the end. 274 * @param appStatus Success/Failure status of the master 275 * @param appMessage Diagnostics message on failure 276 * @param appTrackingUrl New URL to get master info 277 * @throws YarnException 278 * @throws IOException 279 */ 280 public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus, 281 String appMessage, 282 String appTrackingUrl) 283 throws YarnException, IOException; 284 285 /** 286 * Request containers for resources before calling <code>allocate</code> 287 * @param req Resource request 288 */ 289 public abstract void addContainerRequest(T req); 290 291 /** 292 * Remove previous container request. The previous container request may have 293 * already been sent to the ResourceManager. So even after the remove request 294 * the app must be prepared to receive an allocation for the previous request 295 * even after the remove request 296 * @param req Resource request 297 */ 298 public abstract void removeContainerRequest(T req); 299 300 /** 301 * Request container resource change before calling <code>allocate</code>. 302 * Any previous pending resource change request of the same container will be 303 * removed. 304 * 305 * Application that calls this method is expected to maintain the 306 * <code>Container</code>s that are returned from previous successful 307 * allocations or resource changes. By passing in the existing container and a 308 * target resource capability to this method, the application requests the 309 * ResourceManager to change the existing resource allocation to the target 310 * resource allocation. 311 * 312 * @param container The container returned from the last successful resource 313 * allocation or resource change 314 * @param capability The target resource capability of the container 315 */ 316 public abstract void requestContainerResourceChange( 317 Container container, Resource capability); 318 319 /** 320 * Release containers assigned by the Resource Manager. If the app cannot use 321 * the container or wants to give up the container then it can release them. 322 * The app needs to make new requests for the released resource capability if 323 * it still needs it. eg. it released non-local resources 324 * @param containerId 325 */ 326 public abstract void releaseAssignedContainer(ContainerId containerId); 327 328 /** 329 * Get the currently available resources in the cluster. 330 * A valid value is available after a call to allocate has been made 331 * @return Currently available resources 332 */ 333 public abstract Resource getAvailableResources(); 334 335 /** 336 * Get the current number of nodes in the cluster. 337 * A valid values is available after a call to allocate has been made 338 * @return Current number of nodes in the cluster 339 */ 340 public abstract int getClusterNodeCount(); 341 342 /** 343 * Get outstanding <code>ContainerRequest</code>s matching the given 344 * parameters. These ContainerRequests should have been added via 345 * <code>addContainerRequest</code> earlier in the lifecycle. For performance, 346 * the AMRMClient may return its internal collection directly without creating 347 * a copy. Users should not perform mutable operations on the return value. 348 * Each collection in the list contains requests with identical 349 * <code>Resource</code> size that fit in the given capability. In a 350 * collection, requests will be returned in the same order as they were added. 351 * @return Collection of request matching the parameters 352 */ 353 public abstract List<? extends Collection<T>> getMatchingRequests( 354 Priority priority, 355 String resourceName, 356 Resource capability); 357 358 /** 359 * Update application's blacklist with addition or removal resources. 360 * 361 * @param blacklistAdditions list of resources which should be added to the 362 * application blacklist 363 * @param blacklistRemovals list of resources which should be removed from the 364 * application blacklist 365 */ 366 public abstract void updateBlacklist(List<String> blacklistAdditions, 367 List<String> blacklistRemovals); 368 369 /** 370 * Set the NM token cache for the <code>AMRMClient</code>. This cache must 371 * be shared with the {@link NMClient} used to manage containers for the 372 * <code>AMRMClient</code> 373 * <p> 374 * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} 375 * singleton instance will be used. 376 * 377 * @param nmTokenCache the NM token cache to use. 378 */ 379 public void setNMTokenCache(NMTokenCache nmTokenCache) { 380 this.nmTokenCache = nmTokenCache; 381 } 382 383 /** 384 * Get the NM token cache of the <code>AMRMClient</code>. This cache must be 385 * shared with the {@link NMClient} used to manage containers for the 386 * <code>AMRMClient</code>. 387 * <p> 388 * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} 389 * singleton instance will be used. 390 * 391 * @return the NM token cache. 392 */ 393 public NMTokenCache getNMTokenCache() { 394 return nmTokenCache; 395 } 396 397 /** 398 * Wait for <code>check</code> to return true for each 1000 ms. 399 * See also {@link #waitFor(com.google.common.base.Supplier, int)} 400 * and {@link #waitFor(com.google.common.base.Supplier, int, int)} 401 * @param check 402 */ 403 public void waitFor(Supplier<Boolean> check) throws InterruptedException { 404 waitFor(check, 1000); 405 } 406 407 /** 408 * Wait for <code>check</code> to return true for each 409 * <code>checkEveryMillis</code> ms. 410 * See also {@link #waitFor(com.google.common.base.Supplier, int, int)} 411 * @param check user defined checker 412 * @param checkEveryMillis interval to call <code>check</code> 413 */ 414 public void waitFor(Supplier<Boolean> check, int checkEveryMillis) 415 throws InterruptedException { 416 waitFor(check, checkEveryMillis, 1); 417 } 418 419 /** 420 * Wait for <code>check</code> to return true for each 421 * <code>checkEveryMillis</code> ms. In the main loop, this method will log 422 * the message "waiting in main loop" for each <code>logInterval</code> times 423 * iteration to confirm the thread is alive. 424 * @param check user defined checker 425 * @param checkEveryMillis interval to call <code>check</code> 426 * @param logInterval interval to log for each 427 */ 428 public void waitFor(Supplier<Boolean> check, int checkEveryMillis, 429 int logInterval) throws InterruptedException { 430 Preconditions.checkNotNull(check, "check should not be null"); 431 Preconditions.checkArgument(checkEveryMillis >= 0, 432 "checkEveryMillis should be positive value"); 433 Preconditions.checkArgument(logInterval >= 0, 434 "logInterval should be positive value"); 435 436 int loggingCounter = logInterval; 437 do { 438 if (LOG.isDebugEnabled()) { 439 LOG.debug("Check the condition for main loop."); 440 } 441 442 boolean result = check.get(); 443 if (result) { 444 LOG.info("Exits the main loop."); 445 return; 446 } 447 if (--loggingCounter <= 0) { 448 LOG.info("Waiting in main loop."); 449 loggingCounter = logInterval; 450 } 451 452 Thread.sleep(checkEveryMillis); 453 } while (true); 454 } 455 456}