001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.client.api;
020
021import java.io.IOException;
022import java.util.Collection;
023import java.util.List;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceAudience.Private;
029import org.apache.hadoop.classification.InterfaceAudience.Public;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.service.AbstractService;
032import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
033import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
034
035import org.apache.hadoop.yarn.api.records.Container;
036import org.apache.hadoop.yarn.api.records.ContainerId;
037import org.apache.hadoop.yarn.api.records.ExecutionType;
038import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
039import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
040import org.apache.hadoop.yarn.api.records.Priority;
041import org.apache.hadoop.yarn.api.records.Resource;
042import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
043import org.apache.hadoop.yarn.exceptions.YarnException;
044
045import com.google.common.base.Preconditions;
046import com.google.common.base.Supplier;
047import com.google.common.collect.ImmutableList;
048
049@InterfaceAudience.Public
050@InterfaceStability.Stable
051public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
052    AbstractService {
053  private static final Log LOG = LogFactory.getLog(AMRMClient.class);
054
055  private TimelineClient timelineClient;
056
057  /**
058   * Create a new instance of AMRMClient.
059   * For usage:
060   * <pre>
061   * {@code
062   * AMRMClient.<T>createAMRMClientContainerRequest()
063   * }</pre>
064   * @return the newly create AMRMClient instance.
065   */
066  @Public
067  public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient() {
068    AMRMClient<T> client = new AMRMClientImpl<T>();
069    return client;
070  }
071
072  private NMTokenCache nmTokenCache;
073
074  @Private
075  protected AMRMClient(String name) {
076    super(name);
077    nmTokenCache = NMTokenCache.getSingleton();
078  }
079
080  /**
081   * Object to represent a single container request for resources. Scheduler
082   * documentation should be consulted for the specifics of how the parameters
083   * are honored.
084   * 
085   * By default, YARN schedulers try to allocate containers at the requested
086   * locations but they may relax the constraints in order to expedite meeting
087   * allocations limits. They first relax the constraint to the same rack as the
088   * requested node and then to anywhere in the cluster. The relaxLocality flag
089   * may be used to disable locality relaxation and request containers at only 
090   * specific locations. The following conditions apply.
091   * <ul>
092   * <li>Within a priority, all container requests must have the same value for
093   * locality relaxation. Either enabled or disabled.</li>
094   * <li>If locality relaxation is disabled, then across requests, locations at
095   * different network levels may not be specified. E.g. its invalid to make a
096   * request for a specific node and another request for a specific rack.</li>
097   * <li>If locality relaxation is disabled, then only within the same request,  
098   * a node and its rack may be specified together. This allows for a specific   
099   * rack with a preference for a specific node within that rack.</li>
100   * <li></li>
101   * </ul>
102   * To re-enable locality relaxation at a given priority, all pending requests 
103   * with locality relaxation disabled must be first removed. Then they can be 
104   * added back with locality relaxation enabled.
105   * 
106   * All getters return immutable values.
107   */
108  public static class ContainerRequest {
109    final Resource capability;
110    final List<String> nodes;
111    final List<String> racks;
112    final Priority priority;
113    final long allocationRequestId;
114    final boolean relaxLocality;
115    final String nodeLabelsExpression;
116    final ExecutionTypeRequest executionTypeRequest;
117    
118    /**
119     * Instantiates a {@link ContainerRequest} with the given constraints and
120     * locality relaxation enabled.
121     * 
122     * @param capability
123     *          The {@link Resource} to be requested for each container.
124     * @param nodes
125     *          Any hosts to request that the containers are placed on.
126     * @param racks
127     *          Any racks to request that the containers are placed on. The
128     *          racks corresponding to any hosts requested will be automatically
129     *          added to this list.
130     * @param priority
131     *          The priority at which to request the containers. Higher
132     *          priorities have lower numerical values.
133     */
134    public ContainerRequest(Resource capability, String[] nodes,
135        String[] racks, Priority priority) {
136      this(capability, nodes, racks, priority, true, null);
137    }
138
139    /**
140     * Instantiates a {@link ContainerRequest} with the given constraints and
141     * locality relaxation enabled.
142     *
143     * @param capability
144     *          The {@link Resource} to be requested for each container.
145     * @param nodes
146     *          Any hosts to request that the containers are placed on.
147     * @param racks
148     *          Any racks to request that the containers are placed on. The
149     *          racks corresponding to any hosts requested will be automatically
150     *          added to this list.
151     * @param priority
152     *          The priority at which to request the containers. Higher
153     *          priorities have lower numerical values.
154     * @param allocationRequestId Allocation Request Id
155     */
156    @Public
157    @InterfaceStability.Evolving
158    public ContainerRequest(Resource capability, String[] nodes,
159        String[] racks, Priority priority, long allocationRequestId) {
160      this(capability, nodes, racks, priority, allocationRequestId, true, null,
161          ExecutionTypeRequest.newInstance());
162    }
163    
164    /**
165     * Instantiates a {@link ContainerRequest} with the given constraints.
166     * 
167     * @param capability
168     *          The {@link Resource} to be requested for each container.
169     * @param nodes
170     *          Any hosts to request that the containers are placed on.
171     * @param racks
172     *          Any racks to request that the containers are placed on. The
173     *          racks corresponding to any hosts requested will be automatically
174     *          added to this list.
175     * @param priority
176     *          The priority at which to request the containers. Higher
177     *          priorities have lower numerical values.
178     * @param relaxLocality
179     *          If true, containers for this request may be assigned on hosts
180     *          and racks other than the ones explicitly requested.
181     */
182    public ContainerRequest(Resource capability, String[] nodes,
183        String[] racks, Priority priority, boolean relaxLocality) {
184      this(capability, nodes, racks, priority, relaxLocality, null);
185    }
186
187    /**
188     * Instantiates a {@link ContainerRequest} with the given constraints.
189     *
190     * @param capability
191     *          The {@link Resource} to be requested for each container.
192     * @param nodes
193     *          Any hosts to request that the containers are placed on.
194     * @param racks
195     *          Any racks to request that the containers are placed on. The
196     *          racks corresponding to any hosts requested will be automatically
197     *          added to this list.
198     * @param priority
199     *          The priority at which to request the containers. Higher
200     *          priorities have lower numerical values.
201     * @param relaxLocality
202     *          If true, containers for this request may be assigned on hosts
203     *          and racks other than the ones explicitly requested.
204     * @param allocationRequestId Allocation Request Id
205     */
206    @Public
207    @InterfaceStability.Evolving
208    public ContainerRequest(Resource capability, String[] nodes,
209        String[] racks, Priority priority, long allocationRequestId,
210        boolean relaxLocality) {
211      this(capability, nodes, racks, priority, allocationRequestId,
212          relaxLocality, null, ExecutionTypeRequest.newInstance());
213    }
214
215    /**
216     * Instantiates a {@link ContainerRequest} with the given constraints.
217     *
218     * @param capability
219     *          The {@link Resource} to be requested for each container.
220     * @param nodes
221     *          Any hosts to request that the containers are placed on.
222     * @param racks
223     *          Any racks to request that the containers are placed on. The
224     *          racks corresponding to any hosts requested will be automatically
225     *          added to this list.
226     * @param priority
227     *          The priority at which to request the containers. Higher
228     *          priorities have lower numerical values.
229     * @param relaxLocality
230     *          If true, containers for this request may be assigned on hosts
231     *          and racks other than the ones explicitly requested.
232     * @param nodeLabelsExpression
233     *          Set node labels to allocate resource, now we only support
234     *          asking for only a single node label
235     */
236    public ContainerRequest(Resource capability, String[] nodes, String[] racks,
237        Priority priority, boolean relaxLocality, String nodeLabelsExpression) {
238      this(capability, nodes, racks, priority, 0, relaxLocality,
239          nodeLabelsExpression,
240          ExecutionTypeRequest.newInstance());
241    }
242
243    /**
244     * Instantiates a {@link ContainerRequest} with the given constraints.
245     *
246     * @param capability
247     *          The {@link Resource} to be requested for each container.
248     * @param nodes
249     *          Any hosts to request that the containers are placed on.
250     * @param racks
251     *          Any racks to request that the containers are placed on. The
252     *          racks corresponding to any hosts requested will be automatically
253     *          added to this list.
254     * @param priority
255     *          The priority at which to request the containers. Higher
256     *          priorities have lower numerical values.
257     * @param allocationRequestId
258     *          The allocationRequestId of the request. To be used as a tracking
259     *          id to match Containers allocated against this request. Will
260     *          default to 0 if not specified.
261     * @param relaxLocality
262     *          If true, containers for this request may be assigned on hosts
263     *          and racks other than the ones explicitly requested.
264     * @param nodeLabelsExpression
265     *          Set node labels to allocate resource, now we only support
266     *          asking for only a single node label
267     */
268    @Public
269    @InterfaceStability.Evolving
270    public ContainerRequest(Resource capability, String[] nodes, String[] racks,
271        Priority priority, long allocationRequestId, boolean relaxLocality,
272        String nodeLabelsExpression) {
273      this(capability, nodes, racks, priority, allocationRequestId,
274          relaxLocality, nodeLabelsExpression,
275          ExecutionTypeRequest.newInstance());
276    }
277          
278    /**
279     * Instantiates a {@link ContainerRequest} with the given constraints.
280     * 
281     * @param capability
282     *          The {@link Resource} to be requested for each container.
283     * @param nodes
284     *          Any hosts to request that the containers are placed on.
285     * @param racks
286     *          Any racks to request that the containers are placed on. The
287     *          racks corresponding to any hosts requested will be automatically
288     *          added to this list.
289     * @param priority
290     *          The priority at which to request the containers. Higher
291     *          priorities have lower numerical values.
292     * @param allocationRequestId
293     *          The allocationRequestId of the request. To be used as a tracking
294     *          id to match Containers allocated against this request. Will
295     *          default to 0 if not specified.
296     * @param relaxLocality
297     *          If true, containers for this request may be assigned on hosts
298     *          and racks other than the ones explicitly requested.
299     * @param nodeLabelsExpression
300     *          Set node labels to allocate resource, now we only support
301     *          asking for only a single node label
302     * @param executionTypeRequest
303     *          Set the execution type of the container request.
304     */
305    public ContainerRequest(Resource capability, String[] nodes, String[] racks,
306        Priority priority, long allocationRequestId, boolean relaxLocality,
307        String nodeLabelsExpression,
308        ExecutionTypeRequest executionTypeRequest) {
309      // Validate request
310      Preconditions.checkArgument(capability != null,
311          "The Resource to be requested for each container " +
312              "should not be null ");
313      Preconditions.checkArgument(priority != null,
314          "The priority at which to request containers should not be null ");
315      Preconditions.checkArgument(
316              !(!relaxLocality && (racks == null || racks.length == 0) 
317                  && (nodes == null || nodes.length == 0)),
318              "Can't turn off locality relaxation on a " + 
319              "request with no location constraints");
320      this.allocationRequestId = allocationRequestId;
321      this.capability = capability;
322      this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
323      this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
324      this.priority = priority;
325      this.relaxLocality = relaxLocality;
326      this.nodeLabelsExpression = nodeLabelsExpression;
327      this.executionTypeRequest = executionTypeRequest;
328    }
329    
330    public Resource getCapability() {
331      return capability;
332    }
333    
334    public List<String> getNodes() {
335      return nodes;
336    }
337    
338    public List<String> getRacks() {
339      return racks;
340    }
341    
342    public Priority getPriority() {
343      return priority;
344    }
345
346    public long getAllocationRequestId() {
347      return allocationRequestId;
348    }
349    
350    public boolean getRelaxLocality() {
351      return relaxLocality;
352    }
353    
354    public String getNodeLabelExpression() {
355      return nodeLabelsExpression;
356    }
357    
358    public ExecutionTypeRequest getExecutionTypeRequest() {
359      return executionTypeRequest;
360    }
361
362    public String toString() {
363      StringBuilder sb = new StringBuilder();
364      sb.append("Capability[").append(capability).append("]");
365      sb.append("Priority[").append(priority).append("]");
366      sb.append("AllocationRequestId[").append(allocationRequestId).append("]");
367      sb.append("ExecutionTypeRequest[").append(executionTypeRequest)
368          .append("]");
369      return sb.toString();
370    }
371  }
372 
373  /**
374   * Register the application master. This must be called before any 
375   * other interaction
376   * @param appHostName Name of the host on which master is running
377   * @param appHostPort Port master is listening on
378   * @param appTrackingUrl URL at which the master info can be seen
379   * @return <code>RegisterApplicationMasterResponse</code>
380   * @throws YarnException
381   * @throws IOException
382   */
383  public abstract RegisterApplicationMasterResponse 
384               registerApplicationMaster(String appHostName,
385                                         int appHostPort,
386                                         String appTrackingUrl) 
387               throws YarnException, IOException;
388  
389  /**
390   * Request additional containers and receive new container allocations.
391   * Requests made via <code>addContainerRequest</code> are sent to the
392   * <code>ResourceManager</code>. New containers assigned to the master are
393   * retrieved. Status of completed containers and node health updates are also
394   * retrieved. This also doubles up as a heartbeat to the ResourceManager and
395   * must be made periodically. The call may not always return any new
396   * allocations of containers. App should not make concurrent allocate
397   * requests. May cause request loss.
398   * 
399   * <p>
400   * Note : If the user has not removed container requests that have already
401   * been satisfied, then the re-register may end up sending the entire
402   * container requests to the RM (including matched requests). Which would mean
403   * the RM could end up giving it a lot of new allocated containers.
404   * </p>
405   * 
406   * @param progressIndicator Indicates progress made by the master
407   * @return the response of the allocate request
408   * @throws YarnException
409   * @throws IOException
410   */
411  public abstract AllocateResponse allocate(float progressIndicator) 
412                           throws YarnException, IOException;
413  
414  /**
415   * Unregister the application master. This must be called in the end.
416   * @param appStatus Success/Failure status of the master
417   * @param appMessage Diagnostics message on failure
418   * @param appTrackingUrl New URL to get master info
419   * @throws YarnException
420   * @throws IOException
421   */
422  public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus,
423                                           String appMessage,
424                                           String appTrackingUrl) 
425               throws YarnException, IOException;
426  
427  /**
428   * Request containers for resources before calling <code>allocate</code>
429   * @param req Resource request
430   */
431  public abstract void addContainerRequest(T req);
432
433  /**
434   * Remove previous container request. The previous container request may have 
435   * already been sent to the ResourceManager. So even after the remove request 
436   * the app must be prepared to receive an allocation for the previous request 
437   * even after the remove request
438   * @param req Resource request
439   */
440  public abstract void removeContainerRequest(T req);
441
442  /**
443   * Request container resource change before calling <code>allocate</code>.
444   * Any previous pending resource change request of the same container will be
445   * removed.
446   *
447   * Application that calls this method is expected to maintain the
448   * <code>Container</code>s that are returned from previous successful
449   * allocations or resource changes. By passing in the existing container and a
450   * target resource capability to this method, the application requests the
451   * ResourceManager to change the existing resource allocation to the target
452   * resource allocation.
453   *
454   * @param container The container returned from the last successful resource
455   *                  allocation or resource change
456   * @param capability  The target resource capability of the container
457   */
458  public abstract void requestContainerResourceChange(
459      Container container, Resource capability);
460
461  /**
462   * Release containers assigned by the Resource Manager. If the app cannot use
463   * the container or wants to give up the container then it can release them.
464   * The app needs to make new requests for the released resource capability if
465   * it still needs it. eg. it released non-local resources
466   * @param containerId
467   */
468  public abstract void releaseAssignedContainer(ContainerId containerId);
469  
470  /**
471   * Get the currently available resources in the cluster.
472   * A valid value is available after a call to allocate has been made
473   * @return Currently available resources
474   */
475  public abstract Resource getAvailableResources();
476  
477  /**
478   * Get the current number of nodes in the cluster.
479   * A valid values is available after a call to allocate has been made
480   * @return Current number of nodes in the cluster
481   */
482  public abstract int getClusterNodeCount();
483
484  /**
485   * Get outstanding <code>ContainerRequest</code>s matching the given 
486   * parameters. These ContainerRequests should have been added via
487   * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
488   * the AMRMClient may return its internal collection directly without creating 
489   * a copy. Users should not perform mutable operations on the return value.
490   * Each collection in the list contains requests with identical 
491   * <code>Resource</code> size that fit in the given capability. In a 
492   * collection, requests will be returned in the same order as they were added.
493   *
494   * NOTE: This API only matches Container requests that were created by the
495   * client WITHOUT the allocationRequestId being set.
496   *
497   * @return Collection of request matching the parameters
498   */
499  @InterfaceStability.Evolving
500  public abstract List<? extends Collection<T>> getMatchingRequests(
501                                           Priority priority, 
502                                           String resourceName, 
503                                           Resource capability);
504
505  /**
506   * Get outstanding <code>ContainerRequest</code>s matching the given
507   * parameters. These ContainerRequests should have been added via
508   * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
509   * the AMRMClient may return its internal collection directly without creating
510   * a copy. Users should not perform mutable operations on the return value.
511   * Each collection in the list contains requests with identical
512   * <code>Resource</code> size that fit in the given capability. In a
513   * collection, requests will be returned in the same order as they were added.
514   * specify an <code>ExecutionType</code>.
515   *
516   * NOTE: This API only matches Container requests that were created by the
517   * client WITHOUT the allocationRequestId being set.
518   *
519   * @param priority Priority
520   * @param resourceName Location
521   * @param executionType ExecutionType
522   * @param capability Capability
523   * @return Collection of request matching the parameters
524   */
525  @InterfaceStability.Evolving
526  public List<? extends Collection<T>> getMatchingRequests(
527      Priority priority, String resourceName, ExecutionType executionType,
528      Resource capability) {
529    throw new UnsupportedOperationException("The sub-class extending" +
530        " AMRMClient is expected to implement this !!");
531  }
532
533  /**
534   * Get outstanding <code>ContainerRequest</code>s matching the given
535   * allocationRequestId. These ContainerRequests should have been added via
536   * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
537   * the AMRMClient may return its internal collection directly without creating
538   * a copy. Users should not perform mutable operations on the return value.
539   *
540   * NOTE: This API only matches Container requests that were created by the
541   * client WITH the allocationRequestId being set to a non-default value.
542   *
543   * @param allocationRequestId Allocation Request Id
544   * @return Collection of request matching the parameters
545   */
546  @InterfaceStability.Evolving
547  public abstract Collection<T> getMatchingRequests(long allocationRequestId);
548
549  /**
550   * Update application's blacklist with addition or removal resources.
551   * 
552   * @param blacklistAdditions list of resources which should be added to the 
553   *        application blacklist
554   * @param blacklistRemovals list of resources which should be removed from the 
555   *        application blacklist
556   */
557  public abstract void updateBlacklist(List<String> blacklistAdditions,
558      List<String> blacklistRemovals);
559
560  /**
561   * Set the NM token cache for the <code>AMRMClient</code>. This cache must
562   * be shared with the {@link NMClient} used to manage containers for the
563   * <code>AMRMClient</code>
564   * <p>
565   * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
566   * singleton instance will be used.
567   *
568   * @param nmTokenCache the NM token cache to use.
569   */
570  public void setNMTokenCache(NMTokenCache nmTokenCache) {
571    this.nmTokenCache = nmTokenCache;
572  }
573
574  /**
575   * Get the NM token cache of the <code>AMRMClient</code>. This cache must be
576   * shared with the {@link NMClient} used to manage containers for the
577   * <code>AMRMClient</code>.
578   * <p>
579   * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
580   * singleton instance will be used.
581   *
582   * @return the NM token cache.
583   */
584  public NMTokenCache getNMTokenCache() {
585    return nmTokenCache;
586  }
587
588  /**
589   * Register TimelineClient to AMRMClient.
590   * @param client the timeline client to register
591   */
592  public void registerTimelineClient(TimelineClient client) {
593    this.timelineClient = client;
594  }
595
596  /**
597   * Get registered timeline client.
598   * @return the registered timeline client
599   */
600  public TimelineClient getRegisteredTimelineClient() {
601    return this.timelineClient;
602  }
603
604  /**
605   * Wait for <code>check</code> to return true for each 1000 ms.
606   * See also {@link #waitFor(com.google.common.base.Supplier, int)}
607   * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
608   * @param check the condition for which it should wait
609   */
610  public void waitFor(Supplier<Boolean> check) throws InterruptedException {
611    waitFor(check, 1000);
612  }
613
614  /**
615   * Wait for <code>check</code> to return true for each
616   * <code>checkEveryMillis</code> ms.
617   * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
618   * @param check user defined checker
619   * @param checkEveryMillis interval to call <code>check</code>
620   */
621  public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
622      throws InterruptedException {
623    waitFor(check, checkEveryMillis, 1);
624  }
625
626  /**
627   * Wait for <code>check</code> to return true for each
628   * <code>checkEveryMillis</code> ms. In the main loop, this method will log
629   * the message "waiting in main loop" for each <code>logInterval</code> times
630   * iteration to confirm the thread is alive.
631   * @param check user defined checker
632   * @param checkEveryMillis interval to call <code>check</code>
633   * @param logInterval interval to log for each
634   */
635  public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
636      int logInterval) throws InterruptedException {
637    Preconditions.checkNotNull(check, "check should not be null");
638    Preconditions.checkArgument(checkEveryMillis >= 0,
639        "checkEveryMillis should be positive value");
640    Preconditions.checkArgument(logInterval >= 0,
641        "logInterval should be positive value");
642
643    int loggingCounter = logInterval;
644    do {
645      if (LOG.isDebugEnabled()) {
646        LOG.debug("Check the condition for main loop.");
647      }
648
649      boolean result = check.get();
650      if (result) {
651        LOG.info("Exits the main loop.");
652        return;
653      }
654      if (--loggingCounter <= 0) {
655        LOG.info("Waiting in main loop.");
656        loggingCounter = logInterval;
657      }
658
659      Thread.sleep(checkEveryMillis);
660    } while (true);
661  }
662
663}