001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.client.api; 020 021 import java.io.IOException; 022 import java.util.Collection; 023 import java.util.List; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceAudience.Private; 029 import org.apache.hadoop.classification.InterfaceAudience.Public; 030 import org.apache.hadoop.classification.InterfaceStability; 031 import org.apache.hadoop.service.AbstractService; 032 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 033 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 034 import org.apache.hadoop.yarn.api.records.ContainerId; 035 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 036 import org.apache.hadoop.yarn.api.records.Priority; 037 import org.apache.hadoop.yarn.api.records.Resource; 038 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; 039 import org.apache.hadoop.yarn.exceptions.YarnException; 040 041 import com.google.common.base.Preconditions; 042 import com.google.common.base.Supplier; 043 import com.google.common.collect.ImmutableList; 044 045 @InterfaceAudience.Public 046 @InterfaceStability.Stable 047 public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends 048 AbstractService { 049 private static final Log LOG = LogFactory.getLog(AMRMClient.class); 050 051 /** 052 * Create a new instance of AMRMClient. 053 * For usage: 054 * <pre> 055 * {@code 056 * AMRMClient.<T>createAMRMClientContainerRequest() 057 * }</pre> 058 * @return the newly create AMRMClient instance. 059 */ 060 @Public 061 public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient() { 062 AMRMClient<T> client = new AMRMClientImpl<T>(); 063 return client; 064 } 065 066 private NMTokenCache nmTokenCache; 067 068 @Private 069 protected AMRMClient(String name) { 070 super(name); 071 nmTokenCache = NMTokenCache.getSingleton(); 072 } 073 074 /** 075 * Object to represent a single container request for resources. Scheduler 076 * documentation should be consulted for the specifics of how the parameters 077 * are honored. 078 * 079 * By default, YARN schedulers try to allocate containers at the requested 080 * locations but they may relax the constraints in order to expedite meeting 081 * allocations limits. They first relax the constraint to the same rack as the 082 * requested node and then to anywhere in the cluster. The relaxLocality flag 083 * may be used to disable locality relaxation and request containers at only 084 * specific locations. The following conditions apply. 085 * <ul> 086 * <li>Within a priority, all container requests must have the same value for 087 * locality relaxation. Either enabled or disabled.</li> 088 * <li>If locality relaxation is disabled, then across requests, locations at 089 * different network levels may not be specified. E.g. its invalid to make a 090 * request for a specific node and another request for a specific rack.</li> 091 * <li>If locality relaxation is disabled, then only within the same request, 092 * a node and its rack may be specified together. This allows for a specific 093 * rack with a preference for a specific node within that rack.</li> 094 * <li></li> 095 * </ul> 096 * To re-enable locality relaxation at a given priority, all pending requests 097 * with locality relaxation disabled must be first removed. Then they can be 098 * added back with locality relaxation enabled. 099 * 100 * All getters return immutable values. 101 */ 102 public static class ContainerRequest { 103 final Resource capability; 104 final List<String> nodes; 105 final List<String> racks; 106 final Priority priority; 107 final boolean relaxLocality; 108 final String nodeLabelsExpression; 109 110 /** 111 * Instantiates a {@link ContainerRequest} with the given constraints and 112 * locality relaxation enabled. 113 * 114 * @param capability 115 * The {@link Resource} to be requested for each container. 116 * @param nodes 117 * Any hosts to request that the containers are placed on. 118 * @param racks 119 * Any racks to request that the containers are placed on. The 120 * racks corresponding to any hosts requested will be automatically 121 * added to this list. 122 * @param priority 123 * The priority at which to request the containers. Higher 124 * priorities have lower numerical values. 125 */ 126 public ContainerRequest(Resource capability, String[] nodes, 127 String[] racks, Priority priority) { 128 this(capability, nodes, racks, priority, true, null); 129 } 130 131 /** 132 * Instantiates a {@link ContainerRequest} with the given constraints. 133 * 134 * @param capability 135 * The {@link Resource} to be requested for each container. 136 * @param nodes 137 * Any hosts to request that the containers are placed on. 138 * @param racks 139 * Any racks to request that the containers are placed on. The 140 * racks corresponding to any hosts requested will be automatically 141 * added to this list. 142 * @param priority 143 * The priority at which to request the containers. Higher 144 * priorities have lower numerical values. 145 * @param relaxLocality 146 * If true, containers for this request may be assigned on hosts 147 * and racks other than the ones explicitly requested. 148 */ 149 public ContainerRequest(Resource capability, String[] nodes, 150 String[] racks, Priority priority, boolean relaxLocality) { 151 this(capability, nodes, racks, priority, relaxLocality, null); 152 } 153 154 /** 155 * Instantiates a {@link ContainerRequest} with the given constraints. 156 * 157 * @param capability 158 * The {@link Resource} to be requested for each container. 159 * @param nodes 160 * Any hosts to request that the containers are placed on. 161 * @param racks 162 * Any racks to request that the containers are placed on. The 163 * racks corresponding to any hosts requested will be automatically 164 * added to this list. 165 * @param priority 166 * The priority at which to request the containers. Higher 167 * priorities have lower numerical values. 168 * @param relaxLocality 169 * If true, containers for this request may be assigned on hosts 170 * and racks other than the ones explicitly requested. 171 * @param nodeLabelsExpression 172 * Set node labels to allocate resource 173 */ 174 public ContainerRequest(Resource capability, String[] nodes, 175 String[] racks, Priority priority, boolean relaxLocality, 176 String nodeLabelsExpression) { 177 // Validate request 178 Preconditions.checkArgument(capability != null, 179 "The Resource to be requested for each container " + 180 "should not be null "); 181 Preconditions.checkArgument(priority != null, 182 "The priority at which to request containers should not be null "); 183 Preconditions.checkArgument( 184 !(!relaxLocality && (racks == null || racks.length == 0) 185 && (nodes == null || nodes.length == 0)), 186 "Can't turn off locality relaxation on a " + 187 "request with no location constraints"); 188 this.capability = capability; 189 this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); 190 this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); 191 this.priority = priority; 192 this.relaxLocality = relaxLocality; 193 this.nodeLabelsExpression = nodeLabelsExpression; 194 } 195 196 public Resource getCapability() { 197 return capability; 198 } 199 200 public List<String> getNodes() { 201 return nodes; 202 } 203 204 public List<String> getRacks() { 205 return racks; 206 } 207 208 public Priority getPriority() { 209 return priority; 210 } 211 212 public boolean getRelaxLocality() { 213 return relaxLocality; 214 } 215 216 public String getNodeLabelExpression() { 217 return nodeLabelsExpression; 218 } 219 220 public String toString() { 221 StringBuilder sb = new StringBuilder(); 222 sb.append("Capability[").append(capability).append("]"); 223 sb.append("Priority[").append(priority).append("]"); 224 return sb.toString(); 225 } 226 } 227 228 /** 229 * Register the application master. This must be called before any 230 * other interaction 231 * @param appHostName Name of the host on which master is running 232 * @param appHostPort Port master is listening on 233 * @param appTrackingUrl URL at which the master info can be seen 234 * @return <code>RegisterApplicationMasterResponse</code> 235 * @throws YarnException 236 * @throws IOException 237 */ 238 public abstract RegisterApplicationMasterResponse 239 registerApplicationMaster(String appHostName, 240 int appHostPort, 241 String appTrackingUrl) 242 throws YarnException, IOException; 243 244 /** 245 * Request additional containers and receive new container allocations. 246 * Requests made via <code>addContainerRequest</code> are sent to the 247 * <code>ResourceManager</code>. New containers assigned to the master are 248 * retrieved. Status of completed containers and node health updates are also 249 * retrieved. This also doubles up as a heartbeat to the ResourceManager and 250 * must be made periodically. The call may not always return any new 251 * allocations of containers. App should not make concurrent allocate 252 * requests. May cause request loss. 253 * 254 * <p> 255 * Note : If the user has not removed container requests that have already 256 * been satisfied, then the re-register may end up sending the entire 257 * container requests to the RM (including matched requests). Which would mean 258 * the RM could end up giving it a lot of new allocated containers. 259 * </p> 260 * 261 * @param progressIndicator Indicates progress made by the master 262 * @return the response of the allocate request 263 * @throws YarnException 264 * @throws IOException 265 */ 266 public abstract AllocateResponse allocate(float progressIndicator) 267 throws YarnException, IOException; 268 269 /** 270 * Unregister the application master. This must be called in the end. 271 * @param appStatus Success/Failure status of the master 272 * @param appMessage Diagnostics message on failure 273 * @param appTrackingUrl New URL to get master info 274 * @throws YarnException 275 * @throws IOException 276 */ 277 public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus, 278 String appMessage, 279 String appTrackingUrl) 280 throws YarnException, IOException; 281 282 /** 283 * Request containers for resources before calling <code>allocate</code> 284 * @param req Resource request 285 */ 286 public abstract void addContainerRequest(T req); 287 288 /** 289 * Remove previous container request. The previous container request may have 290 * already been sent to the ResourceManager. So even after the remove request 291 * the app must be prepared to receive an allocation for the previous request 292 * even after the remove request 293 * @param req Resource request 294 */ 295 public abstract void removeContainerRequest(T req); 296 297 /** 298 * Release containers assigned by the Resource Manager. If the app cannot use 299 * the container or wants to give up the container then it can release them. 300 * The app needs to make new requests for the released resource capability if 301 * it still needs it. eg. it released non-local resources 302 * @param containerId 303 */ 304 public abstract void releaseAssignedContainer(ContainerId containerId); 305 306 /** 307 * Get the currently available resources in the cluster. 308 * A valid value is available after a call to allocate has been made 309 * @return Currently available resources 310 */ 311 public abstract Resource getAvailableResources(); 312 313 /** 314 * Get the current number of nodes in the cluster. 315 * A valid values is available after a call to allocate has been made 316 * @return Current number of nodes in the cluster 317 */ 318 public abstract int getClusterNodeCount(); 319 320 /** 321 * Get outstanding <code>ContainerRequest</code>s matching the given 322 * parameters. These ContainerRequests should have been added via 323 * <code>addContainerRequest</code> earlier in the lifecycle. For performance, 324 * the AMRMClient may return its internal collection directly without creating 325 * a copy. Users should not perform mutable operations on the return value. 326 * Each collection in the list contains requests with identical 327 * <code>Resource</code> size that fit in the given capability. In a 328 * collection, requests will be returned in the same order as they were added. 329 * @return Collection of request matching the parameters 330 */ 331 public abstract List<? extends Collection<T>> getMatchingRequests( 332 Priority priority, 333 String resourceName, 334 Resource capability); 335 336 /** 337 * Update application's blacklist with addition or removal resources. 338 * 339 * @param blacklistAdditions list of resources which should be added to the 340 * application blacklist 341 * @param blacklistRemovals list of resources which should be removed from the 342 * application blacklist 343 */ 344 public abstract void updateBlacklist(List<String> blacklistAdditions, 345 List<String> blacklistRemovals); 346 347 /** 348 * Set the NM token cache for the <code>AMRMClient</code>. This cache must 349 * be shared with the {@link NMClient} used to manage containers for the 350 * <code>AMRMClient</code> 351 * <p/> 352 * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} 353 * singleton instance will be used. 354 * 355 * @param nmTokenCache the NM token cache to use. 356 */ 357 public void setNMTokenCache(NMTokenCache nmTokenCache) { 358 this.nmTokenCache = nmTokenCache; 359 } 360 361 /** 362 * Get the NM token cache of the <code>AMRMClient</code>. This cache must be 363 * shared with the {@link NMClient} used to manage containers for the 364 * <code>AMRMClient</code>. 365 * <p/> 366 * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} 367 * singleton instance will be used. 368 * 369 * @return the NM token cache. 370 */ 371 public NMTokenCache getNMTokenCache() { 372 return nmTokenCache; 373 } 374 375 /** 376 * Wait for <code>check</code> to return true for each 1000 ms. 377 * See also {@link #waitFor(com.google.common.base.Supplier, int)} 378 * and {@link #waitFor(com.google.common.base.Supplier, int, int)} 379 * @param check 380 */ 381 public void waitFor(Supplier<Boolean> check) throws InterruptedException { 382 waitFor(check, 1000); 383 } 384 385 /** 386 * Wait for <code>check</code> to return true for each 387 * <code>checkEveryMillis</code> ms. 388 * See also {@link #waitFor(com.google.common.base.Supplier, int, int)} 389 * @param check user defined checker 390 * @param checkEveryMillis interval to call <code>check</code> 391 */ 392 public void waitFor(Supplier<Boolean> check, int checkEveryMillis) 393 throws InterruptedException { 394 waitFor(check, checkEveryMillis, 1); 395 } 396 397 /** 398 * Wait for <code>check</code> to return true for each 399 * <code>checkEveryMillis</code> ms. In the main loop, this method will log 400 * the message "waiting in main loop" for each <code>logInterval</code> times 401 * iteration to confirm the thread is alive. 402 * @param check user defined checker 403 * @param checkEveryMillis interval to call <code>check</code> 404 * @param logInterval interval to log for each 405 */ 406 public void waitFor(Supplier<Boolean> check, int checkEveryMillis, 407 int logInterval) throws InterruptedException { 408 Preconditions.checkNotNull(check, "check should not be null"); 409 Preconditions.checkArgument(checkEveryMillis >= 0, 410 "checkEveryMillis should be positive value"); 411 Preconditions.checkArgument(logInterval >= 0, 412 "logInterval should be positive value"); 413 414 int loggingCounter = logInterval; 415 do { 416 if (LOG.isDebugEnabled()) { 417 LOG.debug("Check the condition for main loop."); 418 } 419 420 boolean result = check.get(); 421 if (result) { 422 LOG.info("Exits the main loop."); 423 return; 424 } 425 if (--loggingCounter <= 0) { 426 LOG.info("Waiting in main loop."); 427 loggingCounter = logInterval; 428 } 429 430 Thread.sleep(checkEveryMillis); 431 } while (true); 432 } 433 434 }