001/** 002* Licensed to the Apache Software Foundation (ASF) under one 003* or more contributor license agreements. See the NOTICE file 004* distributed with this work for additional information 005* regarding copyright ownership. The ASF licenses this file 006* to you under the Apache License, Version 2.0 (the 007* "License"); you may not use this file except in compliance 008* with the License. You may obtain a copy of the License at 009* 010* http://www.apache.org/licenses/LICENSE-2.0 011* 012* Unless required by applicable law or agreed to in writing, software 013* distributed under the License is distributed on an "AS IS" BASIS, 014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015* See the License for the specific language governing permissions and 016* limitations under the License. 017*/ 018 019package org.apache.hadoop.yarn.client.api; 020 021import java.io.IOException; 022import java.util.Collection; 023import java.util.List; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceAudience.Private; 029import org.apache.hadoop.classification.InterfaceAudience.Public; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.service.AbstractService; 032import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 033import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 034import org.apache.hadoop.yarn.api.records.ContainerId; 035import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 036import org.apache.hadoop.yarn.api.records.Priority; 037import org.apache.hadoop.yarn.api.records.Resource; 038import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; 039import org.apache.hadoop.yarn.exceptions.YarnException; 040 041import com.google.common.base.Preconditions; 042import com.google.common.base.Supplier; 043import com.google.common.collect.ImmutableList; 044 045@InterfaceAudience.Public 046@InterfaceStability.Stable 047public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends 048 AbstractService { 049 private static final Log LOG = LogFactory.getLog(AMRMClient.class); 050 051 /** 052 * Create a new instance of AMRMClient. 053 * For usage: 054 * <pre> 055 * {@code 056 * AMRMClient.<T>createAMRMClientContainerRequest() 057 * }</pre> 058 * @return the newly create AMRMClient instance. 059 */ 060 @Public 061 public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient() { 062 AMRMClient<T> client = new AMRMClientImpl<T>(); 063 return client; 064 } 065 066 private NMTokenCache nmTokenCache; 067 068 @Private 069 protected AMRMClient(String name) { 070 super(name); 071 nmTokenCache = NMTokenCache.getSingleton(); 072 } 073 074 /** 075 * Object to represent a single container request for resources. Scheduler 076 * documentation should be consulted for the specifics of how the parameters 077 * are honored. 078 * 079 * By default, YARN schedulers try to allocate containers at the requested 080 * locations but they may relax the constraints in order to expedite meeting 081 * allocations limits. They first relax the constraint to the same rack as the 082 * requested node and then to anywhere in the cluster. The relaxLocality flag 083 * may be used to disable locality relaxation and request containers at only 084 * specific locations. The following conditions apply. 085 * <ul> 086 * <li>Within a priority, all container requests must have the same value for 087 * locality relaxation. Either enabled or disabled.</li> 088 * <li>If locality relaxation is disabled, then across requests, locations at 089 * different network levels may not be specified. E.g. its invalid to make a 090 * request for a specific node and another request for a specific rack.</li> 091 * <li>If locality relaxation is disabled, then only within the same request, 092 * a node and its rack may be specified together. This allows for a specific 093 * rack with a preference for a specific node within that rack.</li> 094 * <li></li> 095 * </ul> 096 * To re-enable locality relaxation at a given priority, all pending requests 097 * with locality relaxation disabled must be first removed. Then they can be 098 * added back with locality relaxation enabled. 099 * 100 * All getters return immutable values. 101 */ 102 public static class ContainerRequest { 103 final Resource capability; 104 final List<String> nodes; 105 final List<String> racks; 106 final Priority priority; 107 final boolean relaxLocality; 108 final String nodeLabelsExpression; 109 110 /** 111 * Instantiates a {@link ContainerRequest} with the given constraints and 112 * locality relaxation enabled. 113 * 114 * @param capability 115 * The {@link Resource} to be requested for each container. 116 * @param nodes 117 * Any hosts to request that the containers are placed on. 118 * @param racks 119 * Any racks to request that the containers are placed on. The 120 * racks corresponding to any hosts requested will be automatically 121 * added to this list. 122 * @param priority 123 * The priority at which to request the containers. Higher 124 * priorities have lower numerical values. 125 */ 126 public ContainerRequest(Resource capability, String[] nodes, 127 String[] racks, Priority priority) { 128 this(capability, nodes, racks, priority, true, null); 129 } 130 131 /** 132 * Instantiates a {@link ContainerRequest} with the given constraints. 133 * 134 * @param capability 135 * The {@link Resource} to be requested for each container. 136 * @param nodes 137 * Any hosts to request that the containers are placed on. 138 * @param racks 139 * Any racks to request that the containers are placed on. The 140 * racks corresponding to any hosts requested will be automatically 141 * added to this list. 142 * @param priority 143 * The priority at which to request the containers. Higher 144 * priorities have lower numerical values. 145 * @param relaxLocality 146 * If true, containers for this request may be assigned on hosts 147 * and racks other than the ones explicitly requested. 148 */ 149 public ContainerRequest(Resource capability, String[] nodes, 150 String[] racks, Priority priority, boolean relaxLocality) { 151 this(capability, nodes, racks, priority, relaxLocality, null); 152 } 153 154 /** 155 * Instantiates a {@link ContainerRequest} with the given constraints. 156 * 157 * @param capability 158 * The {@link Resource} to be requested for each container. 159 * @param nodes 160 * Any hosts to request that the containers are placed on. 161 * @param racks 162 * Any racks to request that the containers are placed on. The 163 * racks corresponding to any hosts requested will be automatically 164 * added to this list. 165 * @param priority 166 * The priority at which to request the containers. Higher 167 * priorities have lower numerical values. 168 * @param relaxLocality 169 * If true, containers for this request may be assigned on hosts 170 * and racks other than the ones explicitly requested. 171 * @param nodeLabelsExpression 172 * Set node labels to allocate resource, now we only support 173 * asking for only a single node label 174 */ 175 public ContainerRequest(Resource capability, String[] nodes, 176 String[] racks, Priority priority, boolean relaxLocality, 177 String nodeLabelsExpression) { 178 // Validate request 179 Preconditions.checkArgument(capability != null, 180 "The Resource to be requested for each container " + 181 "should not be null "); 182 Preconditions.checkArgument(priority != null, 183 "The priority at which to request containers should not be null "); 184 Preconditions.checkArgument( 185 !(!relaxLocality && (racks == null || racks.length == 0) 186 && (nodes == null || nodes.length == 0)), 187 "Can't turn off locality relaxation on a " + 188 "request with no location constraints"); 189 this.capability = capability; 190 this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); 191 this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); 192 this.priority = priority; 193 this.relaxLocality = relaxLocality; 194 this.nodeLabelsExpression = nodeLabelsExpression; 195 } 196 197 public Resource getCapability() { 198 return capability; 199 } 200 201 public List<String> getNodes() { 202 return nodes; 203 } 204 205 public List<String> getRacks() { 206 return racks; 207 } 208 209 public Priority getPriority() { 210 return priority; 211 } 212 213 public boolean getRelaxLocality() { 214 return relaxLocality; 215 } 216 217 public String getNodeLabelExpression() { 218 return nodeLabelsExpression; 219 } 220 221 public String toString() { 222 StringBuilder sb = new StringBuilder(); 223 sb.append("Capability[").append(capability).append("]"); 224 sb.append("Priority[").append(priority).append("]"); 225 return sb.toString(); 226 } 227 } 228 229 /** 230 * Register the application master. This must be called before any 231 * other interaction 232 * @param appHostName Name of the host on which master is running 233 * @param appHostPort Port master is listening on 234 * @param appTrackingUrl URL at which the master info can be seen 235 * @return <code>RegisterApplicationMasterResponse</code> 236 * @throws YarnException 237 * @throws IOException 238 */ 239 public abstract RegisterApplicationMasterResponse 240 registerApplicationMaster(String appHostName, 241 int appHostPort, 242 String appTrackingUrl) 243 throws YarnException, IOException; 244 245 /** 246 * Request additional containers and receive new container allocations. 247 * Requests made via <code>addContainerRequest</code> are sent to the 248 * <code>ResourceManager</code>. New containers assigned to the master are 249 * retrieved. Status of completed containers and node health updates are also 250 * retrieved. This also doubles up as a heartbeat to the ResourceManager and 251 * must be made periodically. The call may not always return any new 252 * allocations of containers. App should not make concurrent allocate 253 * requests. May cause request loss. 254 * 255 * <p> 256 * Note : If the user has not removed container requests that have already 257 * been satisfied, then the re-register may end up sending the entire 258 * container requests to the RM (including matched requests). Which would mean 259 * the RM could end up giving it a lot of new allocated containers. 260 * </p> 261 * 262 * @param progressIndicator Indicates progress made by the master 263 * @return the response of the allocate request 264 * @throws YarnException 265 * @throws IOException 266 */ 267 public abstract AllocateResponse allocate(float progressIndicator) 268 throws YarnException, IOException; 269 270 /** 271 * Unregister the application master. This must be called in the end. 272 * @param appStatus Success/Failure status of the master 273 * @param appMessage Diagnostics message on failure 274 * @param appTrackingUrl New URL to get master info 275 * @throws YarnException 276 * @throws IOException 277 */ 278 public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus, 279 String appMessage, 280 String appTrackingUrl) 281 throws YarnException, IOException; 282 283 /** 284 * Request containers for resources before calling <code>allocate</code> 285 * @param req Resource request 286 */ 287 public abstract void addContainerRequest(T req); 288 289 /** 290 * Remove previous container request. The previous container request may have 291 * already been sent to the ResourceManager. So even after the remove request 292 * the app must be prepared to receive an allocation for the previous request 293 * even after the remove request 294 * @param req Resource request 295 */ 296 public abstract void removeContainerRequest(T req); 297 298 /** 299 * Release containers assigned by the Resource Manager. If the app cannot use 300 * the container or wants to give up the container then it can release them. 301 * The app needs to make new requests for the released resource capability if 302 * it still needs it. eg. it released non-local resources 303 * @param containerId 304 */ 305 public abstract void releaseAssignedContainer(ContainerId containerId); 306 307 /** 308 * Get the currently available resources in the cluster. 309 * A valid value is available after a call to allocate has been made 310 * @return Currently available resources 311 */ 312 public abstract Resource getAvailableResources(); 313 314 /** 315 * Get the current number of nodes in the cluster. 316 * A valid values is available after a call to allocate has been made 317 * @return Current number of nodes in the cluster 318 */ 319 public abstract int getClusterNodeCount(); 320 321 /** 322 * Get outstanding <code>ContainerRequest</code>s matching the given 323 * parameters. These ContainerRequests should have been added via 324 * <code>addContainerRequest</code> earlier in the lifecycle. For performance, 325 * the AMRMClient may return its internal collection directly without creating 326 * a copy. Users should not perform mutable operations on the return value. 327 * Each collection in the list contains requests with identical 328 * <code>Resource</code> size that fit in the given capability. In a 329 * collection, requests will be returned in the same order as they were added. 330 * @return Collection of request matching the parameters 331 */ 332 public abstract List<? extends Collection<T>> getMatchingRequests( 333 Priority priority, 334 String resourceName, 335 Resource capability); 336 337 /** 338 * Update application's blacklist with addition or removal resources. 339 * 340 * @param blacklistAdditions list of resources which should be added to the 341 * application blacklist 342 * @param blacklistRemovals list of resources which should be removed from the 343 * application blacklist 344 */ 345 public abstract void updateBlacklist(List<String> blacklistAdditions, 346 List<String> blacklistRemovals); 347 348 /** 349 * Set the NM token cache for the <code>AMRMClient</code>. This cache must 350 * be shared with the {@link NMClient} used to manage containers for the 351 * <code>AMRMClient</code> 352 * <p> 353 * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} 354 * singleton instance will be used. 355 * 356 * @param nmTokenCache the NM token cache to use. 357 */ 358 public void setNMTokenCache(NMTokenCache nmTokenCache) { 359 this.nmTokenCache = nmTokenCache; 360 } 361 362 /** 363 * Get the NM token cache of the <code>AMRMClient</code>. This cache must be 364 * shared with the {@link NMClient} used to manage containers for the 365 * <code>AMRMClient</code>. 366 * <p> 367 * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} 368 * singleton instance will be used. 369 * 370 * @return the NM token cache. 371 */ 372 public NMTokenCache getNMTokenCache() { 373 return nmTokenCache; 374 } 375 376 /** 377 * Wait for <code>check</code> to return true for each 1000 ms. 378 * See also {@link #waitFor(com.google.common.base.Supplier, int)} 379 * and {@link #waitFor(com.google.common.base.Supplier, int, int)} 380 * @param check 381 */ 382 public void waitFor(Supplier<Boolean> check) throws InterruptedException { 383 waitFor(check, 1000); 384 } 385 386 /** 387 * Wait for <code>check</code> to return true for each 388 * <code>checkEveryMillis</code> ms. 389 * See also {@link #waitFor(com.google.common.base.Supplier, int, int)} 390 * @param check user defined checker 391 * @param checkEveryMillis interval to call <code>check</code> 392 */ 393 public void waitFor(Supplier<Boolean> check, int checkEveryMillis) 394 throws InterruptedException { 395 waitFor(check, checkEveryMillis, 1); 396 } 397 398 /** 399 * Wait for <code>check</code> to return true for each 400 * <code>checkEveryMillis</code> ms. In the main loop, this method will log 401 * the message "waiting in main loop" for each <code>logInterval</code> times 402 * iteration to confirm the thread is alive. 403 * @param check user defined checker 404 * @param checkEveryMillis interval to call <code>check</code> 405 * @param logInterval interval to log for each 406 */ 407 public void waitFor(Supplier<Boolean> check, int checkEveryMillis, 408 int logInterval) throws InterruptedException { 409 Preconditions.checkNotNull(check, "check should not be null"); 410 Preconditions.checkArgument(checkEveryMillis >= 0, 411 "checkEveryMillis should be positive value"); 412 Preconditions.checkArgument(logInterval >= 0, 413 "logInterval should be positive value"); 414 415 int loggingCounter = logInterval; 416 do { 417 if (LOG.isDebugEnabled()) { 418 LOG.debug("Check the condition for main loop."); 419 } 420 421 boolean result = check.get(); 422 if (result) { 423 LOG.info("Exits the main loop."); 424 return; 425 } 426 if (--loggingCounter <= 0) { 427 LOG.info("Waiting in main loop."); 428 loggingCounter = logInterval; 429 } 430 431 Thread.sleep(checkEveryMillis); 432 } while (true); 433 } 434 435}