001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.client.api;
020
021import java.io.IOException;
022import java.util.Collection;
023import java.util.List;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceAudience.Private;
029import org.apache.hadoop.classification.InterfaceAudience.Public;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.service.AbstractService;
032import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
033import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
034
035import org.apache.hadoop.yarn.api.records.Container;
036import org.apache.hadoop.yarn.api.records.ContainerId;
037import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
038import org.apache.hadoop.yarn.api.records.Priority;
039import org.apache.hadoop.yarn.api.records.Resource;
040import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
041import org.apache.hadoop.yarn.exceptions.YarnException;
042
043import com.google.common.base.Preconditions;
044import com.google.common.base.Supplier;
045import com.google.common.collect.ImmutableList;
046
047@InterfaceAudience.Public
048@InterfaceStability.Stable
049public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
050    AbstractService {
051  private static final Log LOG = LogFactory.getLog(AMRMClient.class);
052
053  /**
054   * Create a new instance of AMRMClient.
055   * For usage:
056   * <pre>
057   * {@code
058   * AMRMClient.<T>createAMRMClientContainerRequest()
059   * }</pre>
060   * @return the newly create AMRMClient instance.
061   */
062  @Public
063  public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient() {
064    AMRMClient<T> client = new AMRMClientImpl<T>();
065    return client;
066  }
067
068  private NMTokenCache nmTokenCache;
069
070  @Private
071  protected AMRMClient(String name) {
072    super(name);
073    nmTokenCache = NMTokenCache.getSingleton();
074  }
075
076  /**
077   * Object to represent a single container request for resources. Scheduler
078   * documentation should be consulted for the specifics of how the parameters
079   * are honored.
080   * 
081   * By default, YARN schedulers try to allocate containers at the requested
082   * locations but they may relax the constraints in order to expedite meeting
083   * allocations limits. They first relax the constraint to the same rack as the
084   * requested node and then to anywhere in the cluster. The relaxLocality flag
085   * may be used to disable locality relaxation and request containers at only 
086   * specific locations. The following conditions apply.
087   * <ul>
088   * <li>Within a priority, all container requests must have the same value for
089   * locality relaxation. Either enabled or disabled.</li>
090   * <li>If locality relaxation is disabled, then across requests, locations at
091   * different network levels may not be specified. E.g. its invalid to make a
092   * request for a specific node and another request for a specific rack.</li>
093   * <li>If locality relaxation is disabled, then only within the same request,  
094   * a node and its rack may be specified together. This allows for a specific   
095   * rack with a preference for a specific node within that rack.</li>
096   * <li></li>
097   * </ul>
098   * To re-enable locality relaxation at a given priority, all pending requests 
099   * with locality relaxation disabled must be first removed. Then they can be 
100   * added back with locality relaxation enabled.
101   * 
102   * All getters return immutable values.
103   */
104  public static class ContainerRequest {
105    final Resource capability;
106    final List<String> nodes;
107    final List<String> racks;
108    final Priority priority;
109    final boolean relaxLocality;
110    final String nodeLabelsExpression;
111    
112    /**
113     * Instantiates a {@link ContainerRequest} with the given constraints and
114     * locality relaxation enabled.
115     * 
116     * @param capability
117     *          The {@link Resource} to be requested for each container.
118     * @param nodes
119     *          Any hosts to request that the containers are placed on.
120     * @param racks
121     *          Any racks to request that the containers are placed on. The
122     *          racks corresponding to any hosts requested will be automatically
123     *          added to this list.
124     * @param priority
125     *          The priority at which to request the containers. Higher
126     *          priorities have lower numerical values.
127     */
128    public ContainerRequest(Resource capability, String[] nodes,
129        String[] racks, Priority priority) {
130      this(capability, nodes, racks, priority, true, null);
131    }
132    
133    /**
134     * Instantiates a {@link ContainerRequest} with the given constraints.
135     * 
136     * @param capability
137     *          The {@link Resource} to be requested for each container.
138     * @param nodes
139     *          Any hosts to request that the containers are placed on.
140     * @param racks
141     *          Any racks to request that the containers are placed on. The
142     *          racks corresponding to any hosts requested will be automatically
143     *          added to this list.
144     * @param priority
145     *          The priority at which to request the containers. Higher
146     *          priorities have lower numerical values.
147     * @param relaxLocality
148     *          If true, containers for this request may be assigned on hosts
149     *          and racks other than the ones explicitly requested.
150     */
151    public ContainerRequest(Resource capability, String[] nodes,
152        String[] racks, Priority priority, boolean relaxLocality) {
153      this(capability, nodes, racks, priority, relaxLocality, null);
154    }
155          
156    /**
157     * Instantiates a {@link ContainerRequest} with the given constraints.
158     * 
159     * @param capability
160     *          The {@link Resource} to be requested for each container.
161     * @param nodes
162     *          Any hosts to request that the containers are placed on.
163     * @param racks
164     *          Any racks to request that the containers are placed on. The
165     *          racks corresponding to any hosts requested will be automatically
166     *          added to this list.
167     * @param priority
168     *          The priority at which to request the containers. Higher
169     *          priorities have lower numerical values.
170     * @param relaxLocality
171     *          If true, containers for this request may be assigned on hosts
172     *          and racks other than the ones explicitly requested.
173     * @param nodeLabelsExpression
174     *          Set node labels to allocate resource, now we only support
175     *          asking for only a single node label
176     */
177    public ContainerRequest(Resource capability, String[] nodes,
178        String[] racks, Priority priority, boolean relaxLocality,
179        String nodeLabelsExpression) {
180      // Validate request
181      Preconditions.checkArgument(capability != null,
182          "The Resource to be requested for each container " +
183              "should not be null ");
184      Preconditions.checkArgument(priority != null,
185          "The priority at which to request containers should not be null ");
186      Preconditions.checkArgument(
187              !(!relaxLocality && (racks == null || racks.length == 0) 
188                  && (nodes == null || nodes.length == 0)),
189              "Can't turn off locality relaxation on a " + 
190              "request with no location constraints");
191      this.capability = capability;
192      this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
193      this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
194      this.priority = priority;
195      this.relaxLocality = relaxLocality;
196      this.nodeLabelsExpression = nodeLabelsExpression;
197    }
198    
199    public Resource getCapability() {
200      return capability;
201    }
202    
203    public List<String> getNodes() {
204      return nodes;
205    }
206    
207    public List<String> getRacks() {
208      return racks;
209    }
210    
211    public Priority getPriority() {
212      return priority;
213    }
214    
215    public boolean getRelaxLocality() {
216      return relaxLocality;
217    }
218    
219    public String getNodeLabelExpression() {
220      return nodeLabelsExpression;
221    }
222    
223    public String toString() {
224      StringBuilder sb = new StringBuilder();
225      sb.append("Capability[").append(capability).append("]");
226      sb.append("Priority[").append(priority).append("]");
227      return sb.toString();
228    }
229  }
230 
231  /**
232   * Register the application master. This must be called before any 
233   * other interaction
234   * @param appHostName Name of the host on which master is running
235   * @param appHostPort Port master is listening on
236   * @param appTrackingUrl URL at which the master info can be seen
237   * @return <code>RegisterApplicationMasterResponse</code>
238   * @throws YarnException
239   * @throws IOException
240   */
241  public abstract RegisterApplicationMasterResponse 
242               registerApplicationMaster(String appHostName,
243                                         int appHostPort,
244                                         String appTrackingUrl) 
245               throws YarnException, IOException;
246  
247  /**
248   * Request additional containers and receive new container allocations.
249   * Requests made via <code>addContainerRequest</code> are sent to the
250   * <code>ResourceManager</code>. New containers assigned to the master are
251   * retrieved. Status of completed containers and node health updates are also
252   * retrieved. This also doubles up as a heartbeat to the ResourceManager and
253   * must be made periodically. The call may not always return any new
254   * allocations of containers. App should not make concurrent allocate
255   * requests. May cause request loss.
256   * 
257   * <p>
258   * Note : If the user has not removed container requests that have already
259   * been satisfied, then the re-register may end up sending the entire
260   * container requests to the RM (including matched requests). Which would mean
261   * the RM could end up giving it a lot of new allocated containers.
262   * </p>
263   * 
264   * @param progressIndicator Indicates progress made by the master
265   * @return the response of the allocate request
266   * @throws YarnException
267   * @throws IOException
268   */
269  public abstract AllocateResponse allocate(float progressIndicator) 
270                           throws YarnException, IOException;
271  
272  /**
273   * Unregister the application master. This must be called in the end.
274   * @param appStatus Success/Failure status of the master
275   * @param appMessage Diagnostics message on failure
276   * @param appTrackingUrl New URL to get master info
277   * @throws YarnException
278   * @throws IOException
279   */
280  public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus,
281                                           String appMessage,
282                                           String appTrackingUrl) 
283               throws YarnException, IOException;
284  
285  /**
286   * Request containers for resources before calling <code>allocate</code>
287   * @param req Resource request
288   */
289  public abstract void addContainerRequest(T req);
290
291  /**
292   * Remove previous container request. The previous container request may have 
293   * already been sent to the ResourceManager. So even after the remove request 
294   * the app must be prepared to receive an allocation for the previous request 
295   * even after the remove request
296   * @param req Resource request
297   */
298  public abstract void removeContainerRequest(T req);
299
300  /**
301   * Request container resource change before calling <code>allocate</code>.
302   * Any previous pending resource change request of the same container will be
303   * removed.
304   *
305   * Application that calls this method is expected to maintain the
306   * <code>Container</code>s that are returned from previous successful
307   * allocations or resource changes. By passing in the existing container and a
308   * target resource capability to this method, the application requests the
309   * ResourceManager to change the existing resource allocation to the target
310   * resource allocation.
311   *
312   * @param container The container returned from the last successful resource
313   *                  allocation or resource change
314   * @param capability  The target resource capability of the container
315   */
316  public abstract void requestContainerResourceChange(
317      Container container, Resource capability);
318
319  /**
320   * Release containers assigned by the Resource Manager. If the app cannot use
321   * the container or wants to give up the container then it can release them.
322   * The app needs to make new requests for the released resource capability if
323   * it still needs it. eg. it released non-local resources
324   * @param containerId
325   */
326  public abstract void releaseAssignedContainer(ContainerId containerId);
327  
328  /**
329   * Get the currently available resources in the cluster.
330   * A valid value is available after a call to allocate has been made
331   * @return Currently available resources
332   */
333  public abstract Resource getAvailableResources();
334  
335  /**
336   * Get the current number of nodes in the cluster.
337   * A valid values is available after a call to allocate has been made
338   * @return Current number of nodes in the cluster
339   */
340  public abstract int getClusterNodeCount();
341
342  /**
343   * Get outstanding <code>ContainerRequest</code>s matching the given 
344   * parameters. These ContainerRequests should have been added via
345   * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
346   * the AMRMClient may return its internal collection directly without creating 
347   * a copy. Users should not perform mutable operations on the return value.
348   * Each collection in the list contains requests with identical 
349   * <code>Resource</code> size that fit in the given capability. In a 
350   * collection, requests will be returned in the same order as they were added.
351   * @return Collection of request matching the parameters
352   */
353  public abstract List<? extends Collection<T>> getMatchingRequests(
354                                           Priority priority, 
355                                           String resourceName, 
356                                           Resource capability);
357  
358  /**
359   * Update application's blacklist with addition or removal resources.
360   * 
361   * @param blacklistAdditions list of resources which should be added to the 
362   *        application blacklist
363   * @param blacklistRemovals list of resources which should be removed from the 
364   *        application blacklist
365   */
366  public abstract void updateBlacklist(List<String> blacklistAdditions,
367      List<String> blacklistRemovals);
368
369  /**
370   * Set the NM token cache for the <code>AMRMClient</code>. This cache must
371   * be shared with the {@link NMClient} used to manage containers for the
372   * <code>AMRMClient</code>
373   * <p>
374   * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
375   * singleton instance will be used.
376   *
377   * @param nmTokenCache the NM token cache to use.
378   */
379  public void setNMTokenCache(NMTokenCache nmTokenCache) {
380    this.nmTokenCache = nmTokenCache;
381  }
382
383  /**
384   * Get the NM token cache of the <code>AMRMClient</code>. This cache must be
385   * shared with the {@link NMClient} used to manage containers for the
386   * <code>AMRMClient</code>.
387   * <p>
388   * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
389   * singleton instance will be used.
390   *
391   * @return the NM token cache.
392   */
393  public NMTokenCache getNMTokenCache() {
394    return nmTokenCache;
395  }
396
397  /**
398   * Wait for <code>check</code> to return true for each 1000 ms.
399   * See also {@link #waitFor(com.google.common.base.Supplier, int)}
400   * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
401   * @param check
402   */
403  public void waitFor(Supplier<Boolean> check) throws InterruptedException {
404    waitFor(check, 1000);
405  }
406
407  /**
408   * Wait for <code>check</code> to return true for each
409   * <code>checkEveryMillis</code> ms.
410   * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
411   * @param check user defined checker
412   * @param checkEveryMillis interval to call <code>check</code>
413   */
414  public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
415      throws InterruptedException {
416    waitFor(check, checkEveryMillis, 1);
417  }
418
419  /**
420   * Wait for <code>check</code> to return true for each
421   * <code>checkEveryMillis</code> ms. In the main loop, this method will log
422   * the message "waiting in main loop" for each <code>logInterval</code> times
423   * iteration to confirm the thread is alive.
424   * @param check user defined checker
425   * @param checkEveryMillis interval to call <code>check</code>
426   * @param logInterval interval to log for each
427   */
428  public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
429      int logInterval) throws InterruptedException {
430    Preconditions.checkNotNull(check, "check should not be null");
431    Preconditions.checkArgument(checkEveryMillis >= 0,
432        "checkEveryMillis should be positive value");
433    Preconditions.checkArgument(logInterval >= 0,
434        "logInterval should be positive value");
435
436    int loggingCounter = logInterval;
437    do {
438      if (LOG.isDebugEnabled()) {
439        LOG.debug("Check the condition for main loop.");
440      }
441
442      boolean result = check.get();
443      if (result) {
444        LOG.info("Exits the main loop.");
445        return;
446      }
447      if (--loggingCounter <= 0) {
448        LOG.info("Waiting in main loop.");
449        loggingCounter = logInterval;
450      }
451
452      Thread.sleep(checkEveryMillis);
453    } while (true);
454  }
455
456}