001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.client.api;
020
021import java.io.IOException;
022import java.util.Collection;
023import java.util.List;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceAudience.Private;
029import org.apache.hadoop.classification.InterfaceAudience.Public;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.service.AbstractService;
032import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
033import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
034import org.apache.hadoop.yarn.api.records.ContainerId;
035import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
036import org.apache.hadoop.yarn.api.records.Priority;
037import org.apache.hadoop.yarn.api.records.Resource;
038import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
039import org.apache.hadoop.yarn.exceptions.YarnException;
040
041import com.google.common.base.Preconditions;
042import com.google.common.base.Supplier;
043import com.google.common.collect.ImmutableList;
044
045@InterfaceAudience.Public
046@InterfaceStability.Stable
047public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
048    AbstractService {
049  private static final Log LOG = LogFactory.getLog(AMRMClient.class);
050
051  /**
052   * Create a new instance of AMRMClient.
053   * For usage:
054   * <pre>
055   * {@code
056   * AMRMClient.<T>createAMRMClientContainerRequest()
057   * }</pre>
058   * @return the newly create AMRMClient instance.
059   */
060  @Public
061  public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient() {
062    AMRMClient<T> client = new AMRMClientImpl<T>();
063    return client;
064  }
065
066  private NMTokenCache nmTokenCache;
067
068  @Private
069  protected AMRMClient(String name) {
070    super(name);
071    nmTokenCache = NMTokenCache.getSingleton();
072  }
073
074  /**
075   * Object to represent a single container request for resources. Scheduler
076   * documentation should be consulted for the specifics of how the parameters
077   * are honored.
078   * 
079   * By default, YARN schedulers try to allocate containers at the requested
080   * locations but they may relax the constraints in order to expedite meeting
081   * allocations limits. They first relax the constraint to the same rack as the
082   * requested node and then to anywhere in the cluster. The relaxLocality flag
083   * may be used to disable locality relaxation and request containers at only 
084   * specific locations. The following conditions apply.
085   * <ul>
086   * <li>Within a priority, all container requests must have the same value for
087   * locality relaxation. Either enabled or disabled.</li>
088   * <li>If locality relaxation is disabled, then across requests, locations at
089   * different network levels may not be specified. E.g. its invalid to make a
090   * request for a specific node and another request for a specific rack.</li>
091   * <li>If locality relaxation is disabled, then only within the same request,  
092   * a node and its rack may be specified together. This allows for a specific   
093   * rack with a preference for a specific node within that rack.</li>
094   * <li></li>
095   * </ul>
096   * To re-enable locality relaxation at a given priority, all pending requests 
097   * with locality relaxation disabled must be first removed. Then they can be 
098   * added back with locality relaxation enabled.
099   * 
100   * All getters return immutable values.
101   */
102  public static class ContainerRequest {
103    final Resource capability;
104    final List<String> nodes;
105    final List<String> racks;
106    final Priority priority;
107    final boolean relaxLocality;
108    final String nodeLabelsExpression;
109    
110    /**
111     * Instantiates a {@link ContainerRequest} with the given constraints and
112     * locality relaxation enabled.
113     * 
114     * @param capability
115     *          The {@link Resource} to be requested for each container.
116     * @param nodes
117     *          Any hosts to request that the containers are placed on.
118     * @param racks
119     *          Any racks to request that the containers are placed on. The
120     *          racks corresponding to any hosts requested will be automatically
121     *          added to this list.
122     * @param priority
123     *          The priority at which to request the containers. Higher
124     *          priorities have lower numerical values.
125     */
126    public ContainerRequest(Resource capability, String[] nodes,
127        String[] racks, Priority priority) {
128      this(capability, nodes, racks, priority, true, null);
129    }
130    
131    /**
132     * Instantiates a {@link ContainerRequest} with the given constraints.
133     * 
134     * @param capability
135     *          The {@link Resource} to be requested for each container.
136     * @param nodes
137     *          Any hosts to request that the containers are placed on.
138     * @param racks
139     *          Any racks to request that the containers are placed on. The
140     *          racks corresponding to any hosts requested will be automatically
141     *          added to this list.
142     * @param priority
143     *          The priority at which to request the containers. Higher
144     *          priorities have lower numerical values.
145     * @param relaxLocality
146     *          If true, containers for this request may be assigned on hosts
147     *          and racks other than the ones explicitly requested.
148     */
149    public ContainerRequest(Resource capability, String[] nodes,
150        String[] racks, Priority priority, boolean relaxLocality) {
151      this(capability, nodes, racks, priority, relaxLocality, null);
152    }
153          
154    /**
155     * Instantiates a {@link ContainerRequest} with the given constraints.
156     * 
157     * @param capability
158     *          The {@link Resource} to be requested for each container.
159     * @param nodes
160     *          Any hosts to request that the containers are placed on.
161     * @param racks
162     *          Any racks to request that the containers are placed on. The
163     *          racks corresponding to any hosts requested will be automatically
164     *          added to this list.
165     * @param priority
166     *          The priority at which to request the containers. Higher
167     *          priorities have lower numerical values.
168     * @param relaxLocality
169     *          If true, containers for this request may be assigned on hosts
170     *          and racks other than the ones explicitly requested.
171     * @param nodeLabelsExpression
172     *          Set node labels to allocate resource, now we only support
173     *          asking for only a single node label
174     */
175    public ContainerRequest(Resource capability, String[] nodes,
176        String[] racks, Priority priority, boolean relaxLocality,
177        String nodeLabelsExpression) {
178      // Validate request
179      Preconditions.checkArgument(capability != null,
180          "The Resource to be requested for each container " +
181              "should not be null ");
182      Preconditions.checkArgument(priority != null,
183          "The priority at which to request containers should not be null ");
184      Preconditions.checkArgument(
185              !(!relaxLocality && (racks == null || racks.length == 0) 
186                  && (nodes == null || nodes.length == 0)),
187              "Can't turn off locality relaxation on a " + 
188              "request with no location constraints");
189      this.capability = capability;
190      this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
191      this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
192      this.priority = priority;
193      this.relaxLocality = relaxLocality;
194      this.nodeLabelsExpression = nodeLabelsExpression;
195    }
196    
197    public Resource getCapability() {
198      return capability;
199    }
200    
201    public List<String> getNodes() {
202      return nodes;
203    }
204    
205    public List<String> getRacks() {
206      return racks;
207    }
208    
209    public Priority getPriority() {
210      return priority;
211    }
212    
213    public boolean getRelaxLocality() {
214      return relaxLocality;
215    }
216    
217    public String getNodeLabelExpression() {
218      return nodeLabelsExpression;
219    }
220    
221    public String toString() {
222      StringBuilder sb = new StringBuilder();
223      sb.append("Capability[").append(capability).append("]");
224      sb.append("Priority[").append(priority).append("]");
225      return sb.toString();
226    }
227  }
228 
229  /**
230   * Register the application master. This must be called before any 
231   * other interaction
232   * @param appHostName Name of the host on which master is running
233   * @param appHostPort Port master is listening on
234   * @param appTrackingUrl URL at which the master info can be seen
235   * @return <code>RegisterApplicationMasterResponse</code>
236   * @throws YarnException
237   * @throws IOException
238   */
239  public abstract RegisterApplicationMasterResponse 
240               registerApplicationMaster(String appHostName,
241                                         int appHostPort,
242                                         String appTrackingUrl) 
243               throws YarnException, IOException;
244  
245  /**
246   * Request additional containers and receive new container allocations.
247   * Requests made via <code>addContainerRequest</code> are sent to the
248   * <code>ResourceManager</code>. New containers assigned to the master are
249   * retrieved. Status of completed containers and node health updates are also
250   * retrieved. This also doubles up as a heartbeat to the ResourceManager and
251   * must be made periodically. The call may not always return any new
252   * allocations of containers. App should not make concurrent allocate
253   * requests. May cause request loss.
254   * 
255   * <p>
256   * Note : If the user has not removed container requests that have already
257   * been satisfied, then the re-register may end up sending the entire
258   * container requests to the RM (including matched requests). Which would mean
259   * the RM could end up giving it a lot of new allocated containers.
260   * </p>
261   * 
262   * @param progressIndicator Indicates progress made by the master
263   * @return the response of the allocate request
264   * @throws YarnException
265   * @throws IOException
266   */
267  public abstract AllocateResponse allocate(float progressIndicator) 
268                           throws YarnException, IOException;
269  
270  /**
271   * Unregister the application master. This must be called in the end.
272   * @param appStatus Success/Failure status of the master
273   * @param appMessage Diagnostics message on failure
274   * @param appTrackingUrl New URL to get master info
275   * @throws YarnException
276   * @throws IOException
277   */
278  public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus,
279                                           String appMessage,
280                                           String appTrackingUrl) 
281               throws YarnException, IOException;
282  
283  /**
284   * Request containers for resources before calling <code>allocate</code>
285   * @param req Resource request
286   */
287  public abstract void addContainerRequest(T req);
288  
289  /**
290   * Remove previous container request. The previous container request may have 
291   * already been sent to the ResourceManager. So even after the remove request 
292   * the app must be prepared to receive an allocation for the previous request 
293   * even after the remove request
294   * @param req Resource request
295   */
296  public abstract void removeContainerRequest(T req);
297  
298  /**
299   * Release containers assigned by the Resource Manager. If the app cannot use
300   * the container or wants to give up the container then it can release them.
301   * The app needs to make new requests for the released resource capability if
302   * it still needs it. eg. it released non-local resources
303   * @param containerId
304   */
305  public abstract void releaseAssignedContainer(ContainerId containerId);
306  
307  /**
308   * Get the currently available resources in the cluster.
309   * A valid value is available after a call to allocate has been made
310   * @return Currently available resources
311   */
312  public abstract Resource getAvailableResources();
313  
314  /**
315   * Get the current number of nodes in the cluster.
316   * A valid values is available after a call to allocate has been made
317   * @return Current number of nodes in the cluster
318   */
319  public abstract int getClusterNodeCount();
320
321  /**
322   * Get outstanding <code>ContainerRequest</code>s matching the given 
323   * parameters. These ContainerRequests should have been added via
324   * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
325   * the AMRMClient may return its internal collection directly without creating 
326   * a copy. Users should not perform mutable operations on the return value.
327   * Each collection in the list contains requests with identical 
328   * <code>Resource</code> size that fit in the given capability. In a 
329   * collection, requests will be returned in the same order as they were added.
330   * @return Collection of request matching the parameters
331   */
332  public abstract List<? extends Collection<T>> getMatchingRequests(
333                                           Priority priority, 
334                                           String resourceName, 
335                                           Resource capability);
336  
337  /**
338   * Update application's blacklist with addition or removal resources.
339   * 
340   * @param blacklistAdditions list of resources which should be added to the 
341   *        application blacklist
342   * @param blacklistRemovals list of resources which should be removed from the 
343   *        application blacklist
344   */
345  public abstract void updateBlacklist(List<String> blacklistAdditions,
346      List<String> blacklistRemovals);
347
348  /**
349   * Set the NM token cache for the <code>AMRMClient</code>. This cache must
350   * be shared with the {@link NMClient} used to manage containers for the
351   * <code>AMRMClient</code>
352   * <p/>
353   * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
354   * singleton instance will be used.
355   *
356   * @param nmTokenCache the NM token cache to use.
357   */
358  public void setNMTokenCache(NMTokenCache nmTokenCache) {
359    this.nmTokenCache = nmTokenCache;
360  }
361
362  /**
363   * Get the NM token cache of the <code>AMRMClient</code>. This cache must be
364   * shared with the {@link NMClient} used to manage containers for the
365   * <code>AMRMClient</code>.
366   * <p/>
367   * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
368   * singleton instance will be used.
369   *
370   * @return the NM token cache.
371   */
372  public NMTokenCache getNMTokenCache() {
373    return nmTokenCache;
374  }
375
376  /**
377   * Wait for <code>check</code> to return true for each 1000 ms.
378   * See also {@link #waitFor(com.google.common.base.Supplier, int)}
379   * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
380   * @param check
381   */
382  public void waitFor(Supplier<Boolean> check) throws InterruptedException {
383    waitFor(check, 1000);
384  }
385
386  /**
387   * Wait for <code>check</code> to return true for each
388   * <code>checkEveryMillis</code> ms.
389   * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
390   * @param check user defined checker
391   * @param checkEveryMillis interval to call <code>check</code>
392   */
393  public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
394      throws InterruptedException {
395    waitFor(check, checkEveryMillis, 1);
396  }
397
398  /**
399   * Wait for <code>check</code> to return true for each
400   * <code>checkEveryMillis</code> ms. In the main loop, this method will log
401   * the message "waiting in main loop" for each <code>logInterval</code> times
402   * iteration to confirm the thread is alive.
403   * @param check user defined checker
404   * @param checkEveryMillis interval to call <code>check</code>
405   * @param logInterval interval to log for each
406   */
407  public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
408      int logInterval) throws InterruptedException {
409    Preconditions.checkNotNull(check, "check should not be null");
410    Preconditions.checkArgument(checkEveryMillis >= 0,
411        "checkEveryMillis should be positive value");
412    Preconditions.checkArgument(logInterval >= 0,
413        "logInterval should be positive value");
414
415    int loggingCounter = logInterval;
416    do {
417      if (LOG.isDebugEnabled()) {
418        LOG.debug("Check the condition for main loop.");
419      }
420
421      boolean result = check.get();
422      if (result) {
423        LOG.info("Exits the main loop.");
424        return;
425      }
426      if (--loggingCounter <= 0) {
427        LOG.info("Waiting in main loop.");
428        loggingCounter = logInterval;
429      }
430
431      Thread.sleep(checkEveryMillis);
432    } while (true);
433  }
434
435}