001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.client.api.async;
020
021import com.google.common.base.Preconditions;
022import com.google.common.base.Supplier;
023import java.io.IOException;
024import java.util.Collection;
025import java.util.List;
026import java.util.concurrent.atomic.AtomicInteger;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience.Private;
031import org.apache.hadoop.classification.InterfaceAudience.Public;
032import org.apache.hadoop.classification.InterfaceStability.Stable;
033import org.apache.hadoop.service.AbstractService;
034import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
035import org.apache.hadoop.yarn.api.records.Container;
036import org.apache.hadoop.yarn.api.records.ContainerId;
037import org.apache.hadoop.yarn.api.records.ContainerStatus;
038import org.apache.hadoop.yarn.api.records.ExecutionType;
039import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
040import org.apache.hadoop.yarn.api.records.NodeReport;
041import org.apache.hadoop.yarn.api.records.Priority;
042import org.apache.hadoop.yarn.api.records.Resource;
043import org.apache.hadoop.yarn.client.api.AMRMClient;
044import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
045import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
046import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
047import org.apache.hadoop.yarn.client.api.TimelineClient;
048import org.apache.hadoop.yarn.exceptions.YarnException;
049
050import com.google.common.annotations.VisibleForTesting;
051
052/**
053 * <code>AMRMClientAsync</code> handles communication with the ResourceManager
054 * and provides asynchronous updates on events such as container allocations and
055 * completions.  It contains a thread that sends periodic heartbeats to the
056 * ResourceManager.
057 * 
058 * It should be used by implementing a CallbackHandler:
059 * <pre>
060 * {@code
061 * class MyCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
062 *   public void onContainersAllocated(List<Container> containers) {
063 *     [run tasks on the containers]
064 *   }
065 *
066 *   public void onContainersResourceChanged(List<Container> containers) {
067 *     [determine if resource allocation of containers have been increased in
068 *      the ResourceManager, and if so, inform the NodeManagers to increase the
069 *      resource monitor/enforcement on the containers]
070 *   }
071 *
072 *   public void onContainersCompleted(List<ContainerStatus> statuses) {
073 *     [update progress, check whether app is done]
074 *   }
075 *   
076 *   public void onNodesUpdated(List<NodeReport> updated) {}
077 *   
078 *   public void onReboot() {}
079 * }
080 * }
081 * </pre>
082 * 
083 * The client's lifecycle should be managed similarly to the following:
084 * 
085 * <pre>
086 * {@code
087 * AMRMClientAsync asyncClient = 
088 *     createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
089 * asyncClient.init(conf);
090 * asyncClient.start();
091 * RegisterApplicationMasterResponse response = asyncClient
092 *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
093 *       appMasterTrackingUrl);
094 * asyncClient.addContainerRequest(containerRequest);
095 * [... wait for application to complete]
096 * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
097 * asyncClient.stop();
098 * }
099 * </pre>
100 */
101@Public
102@Stable
103public abstract class AMRMClientAsync<T extends ContainerRequest> 
104extends AbstractService {
105  private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
106  
107  protected final AMRMClient<T> client;
108  protected final CallbackHandler handler;
109  protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
110
111  /**
112   * <p>Create a new instance of AMRMClientAsync.</p>
113   *
114   * @param intervalMs heartbeat interval in milliseconds between AM and RM
115   * @param callbackHandler callback handler that processes responses from
116   *                        the <code>ResourceManager</code>
117   */
118  public static <T extends ContainerRequest> AMRMClientAsync<T>
119      createAMRMClientAsync(
120      int intervalMs, AbstractCallbackHandler callbackHandler) {
121    return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
122  }
123
124  /**
125   * <p>Create a new instance of AMRMClientAsync.</p>
126   *
127   * @param client the AMRMClient instance
128   * @param intervalMs heartbeat interval in milliseconds between AM and RM
129   * @param callbackHandler callback handler that processes responses from
130   *                        the <code>ResourceManager</code>
131   */
132  public static <T extends ContainerRequest> AMRMClientAsync<T>
133      createAMRMClientAsync(
134      AMRMClient<T> client, int intervalMs,
135      AbstractCallbackHandler callbackHandler) {
136    return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
137  }
138
139  protected AMRMClientAsync(
140      int intervalMs, AbstractCallbackHandler callbackHandler) {
141    this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
142  }
143
144  @Private
145  @VisibleForTesting
146  protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
147      AbstractCallbackHandler callbackHandler) {
148    super(AMRMClientAsync.class.getName());
149    this.client = client;
150    this.heartbeatIntervalMs.set(intervalMs);
151    this.handler = callbackHandler;
152  }
153
154  /**
155   *
156   * @deprecated Use {@link #createAMRMClientAsync(int,
157   *             AMRMClientAsync.AbstractCallbackHandler)} instead.
158   */
159  @Deprecated
160  public static <T extends ContainerRequest> AMRMClientAsync<T>
161      createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
162    return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
163  }
164
165  /**
166   *
167   * @deprecated Use {@link #createAMRMClientAsync(AMRMClient,
168   *             int, AMRMClientAsync.AbstractCallbackHandler)} instead.
169   */
170  @Deprecated
171  public static <T extends ContainerRequest> AMRMClientAsync<T>
172      createAMRMClientAsync(AMRMClient<T> client, int intervalMs,
173          CallbackHandler callbackHandler) {
174    return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
175  }
176
177  @Deprecated
178  protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
179    this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
180  }
181  
182  @Private
183  @VisibleForTesting
184  @Deprecated
185  protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
186      CallbackHandler callbackHandler) {
187    super(AMRMClientAsync.class.getName());
188    this.client = client;
189    this.heartbeatIntervalMs.set(intervalMs);
190    this.handler = callbackHandler;
191  }
192    
193  public void setHeartbeatInterval(int interval) {
194    heartbeatIntervalMs.set(interval);
195  }
196  
197  public abstract List<? extends Collection<T>> getMatchingRequests(
198                                                   Priority priority, 
199                                                   String resourceName, 
200                                                   Resource capability);
201
202  /**
203   * Returns all matching ContainerRequests that match the given Priority,
204   * ResourceName, ExecutionType and Capability.
205   *
206   * NOTE: This matches only requests that were made by the client WITHOUT the
207   * allocationRequestId specified.
208   *
209   * @param priority Priority.
210   * @param resourceName Location.
211   * @param executionType ExecutionType.
212   * @param capability Capability.
213   * @return All matching ContainerRequests
214   */
215  public List<? extends Collection<T>> getMatchingRequests(
216      Priority priority, String resourceName, ExecutionType executionType,
217      Resource capability) {
218    return client.getMatchingRequests(priority, resourceName,
219        executionType, capability);
220  }
221
222  /**
223   * Returns all matching ContainerRequests that match the given
224   * AllocationRequestId.
225   *
226   * NOTE: This matches only requests that were made by the client WITH the
227   * allocationRequestId specified.
228   *
229   * @param allocationRequestId AllocationRequestId.
230   * @return All matching ContainerRequests
231   */
232  public Collection<T> getMatchingRequests(long allocationRequestId) {
233    return client.getMatchingRequests(allocationRequestId);
234  }
235  
236  /**
237   * Registers this application master with the resource manager. On successful
238   * registration, starts the heartbeating thread.
239   * @throws YarnException
240   * @throws IOException
241   */
242  public abstract RegisterApplicationMasterResponse registerApplicationMaster(
243      String appHostName, int appHostPort, String appTrackingUrl)
244      throws YarnException, IOException;
245
246  /**
247   * Unregister the application master. This must be called in the end.
248   * @param appStatus Success/Failure status of the master
249   * @param appMessage Diagnostics message on failure
250   * @param appTrackingUrl New URL to get master info
251   * @throws YarnException
252   * @throws IOException
253   */
254  public abstract void unregisterApplicationMaster(
255      FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) 
256  throws YarnException, IOException;
257
258  /**
259   * Request containers for resources before calling <code>allocate</code>
260   * @param req Resource request
261   */
262  public abstract void addContainerRequest(T req);
263
264  /**
265   * Remove previous container request. The previous container request may have 
266   * already been sent to the ResourceManager. So even after the remove request 
267   * the app must be prepared to receive an allocation for the previous request 
268   * even after the remove request
269   * @param req Resource request
270   */
271  public abstract void removeContainerRequest(T req);
272
273  /**
274   * Request container resource change before calling <code>allocate</code>.
275   * Any previous pending resource change request of the same container will be
276   * removed.
277   *
278   * Application that calls this method is expected to maintain the
279   * <code>Container</code>s that are returned from previous successful
280   * allocations or resource changes. By passing in the existing container and a
281   * target resource capability to this method, the application requests the
282   * ResourceManager to change the existing resource allocation to the target
283   * resource allocation.
284   *
285   * @param container The container returned from the last successful resource
286   *                  allocation or resource change
287   * @param capability  The target resource capability of the container
288   */
289  public abstract void requestContainerResourceChange(
290      Container container, Resource capability);
291
292  /**
293   * Release containers assigned by the Resource Manager. If the app cannot use
294   * the container or wants to give up the container then it can release them.
295   * The app needs to make new requests for the released resource capability if
296   * it still needs it. eg. it released non-local resources
297   * @param containerId
298   */
299  public abstract void releaseAssignedContainer(ContainerId containerId);
300
301  /**
302   * Get the currently available resources in the cluster.
303   * A valid value is available after a call to allocate has been made
304   * @return Currently available resources
305   */
306  public abstract Resource getAvailableResources();
307
308  /**
309   * Get the current number of nodes in the cluster.
310   * A valid values is available after a call to allocate has been made
311   * @return Current number of nodes in the cluster
312   */
313  public abstract int getClusterNodeCount();
314
315  /**
316   * Register TimelineClient to AMRMClient.
317   * @param timelineClient
318   */
319  public void registerTimelineClient(TimelineClient timelineClient) {
320    client.registerTimelineClient(timelineClient);
321  }
322
323  /**
324   * Get registered timeline client.
325   * @return the registered timeline client
326   */
327  public TimelineClient getRegisteredTimelineClient() {
328    return client.getRegisteredTimelineClient();
329  }
330
331  /**
332   * Update application's blacklist with addition or removal resources.
333   *
334   * @param blacklistAdditions list of resources which should be added to the
335   *        application blacklist
336   * @param blacklistRemovals list of resources which should be removed from the
337   *        application blacklist
338   */
339  public abstract void updateBlacklist(List<String> blacklistAdditions,
340                                       List<String> blacklistRemovals);
341
342  /**
343   * Wait for <code>check</code> to return true for each 1000 ms.
344   * See also {@link #waitFor(com.google.common.base.Supplier, int)}
345   * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
346   * @param check the condition for which it should wait
347   */
348  public void waitFor(Supplier<Boolean> check) throws InterruptedException {
349    waitFor(check, 1000);
350  }
351
352  /**
353   * Wait for <code>check</code> to return true for each
354   * <code>checkEveryMillis</code> ms.
355   * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
356   * @param check user defined checker
357   * @param checkEveryMillis interval to call <code>check</code>
358   */
359  public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
360      throws InterruptedException {
361    waitFor(check, checkEveryMillis, 1);
362  };
363
364  /**
365   * Wait for <code>check</code> to return true for each
366   * <code>checkEveryMillis</code> ms. In the main loop, this method will log
367   * the message "waiting in main loop" for each <code>logInterval</code> times
368   * iteration to confirm the thread is alive.
369   * @param check user defined checker
370   * @param checkEveryMillis interval to call <code>check</code>
371   * @param logInterval interval to log for each
372   */
373  public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
374      int logInterval) throws InterruptedException {
375    Preconditions.checkNotNull(check, "check should not be null");
376    Preconditions.checkArgument(checkEveryMillis >= 0,
377        "checkEveryMillis should be positive value");
378    Preconditions.checkArgument(logInterval >= 0,
379        "logInterval should be positive value");
380
381    int loggingCounter = logInterval;
382    do {
383      if (LOG.isDebugEnabled()) {
384        LOG.debug("Check the condition for main loop.");
385      }
386
387      boolean result = check.get();
388      if (result) {
389        LOG.info("Exits the main loop.");
390        return;
391      }
392      if (--loggingCounter <= 0) {
393        LOG.info("Waiting in main loop.");
394        loggingCounter = logInterval;
395      }
396
397      Thread.sleep(checkEveryMillis);
398    } while (true);
399  }
400
401  /**
402   * <p>
403   * The callback abstract class. The callback functions need to be implemented
404   * by {@link AMRMClientAsync} users. The APIs are called when responses from
405   * the <code>ResourceManager</code> are available.
406   * </p>
407   */
408  public abstract static class AbstractCallbackHandler
409      implements CallbackHandler {
410
411    /**
412     * Called when the ResourceManager responds to a heartbeat with completed
413     * containers. If the response contains both completed containers and
414     * allocated containers, this will be called before containersAllocated.
415     */
416    public abstract void onContainersCompleted(List<ContainerStatus> statuses);
417
418    /**
419     * Called when the ResourceManager responds to a heartbeat with allocated
420     * containers. If the response containers both completed containers and
421     * allocated containers, this will be called after containersCompleted.
422     */
423    public abstract void onContainersAllocated(List<Container> containers);
424
425    /**
426     * Called when the ResourceManager responds to a heartbeat with containers
427     * whose resource allocation has been changed.
428     */
429    public abstract void onContainersResourceChanged(
430        List<Container> containers);
431
432    /**
433     * Called when the ResourceManager wants the ApplicationMaster to shutdown
434     * for being out of sync etc. The ApplicationMaster should not unregister
435     * with the RM unless the ApplicationMaster wants to be the last attempt.
436     */
437    public abstract void onShutdownRequest();
438
439    /**
440     * Called when nodes tracked by the ResourceManager have changed in health,
441     * availability etc.
442     */
443    public abstract void onNodesUpdated(List<NodeReport> updatedNodes);
444
445    public abstract float getProgress();
446
447    /**
448     * Called when error comes from RM communications as well as from errors in
449     * the callback itself from the app. Calling
450     * stop() is the recommended action.
451     */
452    public abstract void onError(Throwable e);
453  }
454
455  /**
456   * @deprecated Use {@link AMRMClientAsync.AbstractCallbackHandler} instead.
457   */
458  @Deprecated
459  public interface CallbackHandler {
460
461    /**
462     * Called when the ResourceManager responds to a heartbeat with completed
463     * containers. If the response contains both completed containers and
464     * allocated containers, this will be called before containersAllocated.
465     */
466    void onContainersCompleted(List<ContainerStatus> statuses);
467
468    /**
469     * Called when the ResourceManager responds to a heartbeat with allocated
470     * containers. If the response containers both completed containers and
471     * allocated containers, this will be called after containersCompleted.
472     */
473    void onContainersAllocated(List<Container> containers);
474
475    /**
476     * Called when the ResourceManager wants the ApplicationMaster to shutdown
477     * for being out of sync etc. The ApplicationMaster should not unregister
478     * with the RM unless the ApplicationMaster wants to be the last attempt.
479     */
480    void onShutdownRequest();
481
482    /**
483     * Called when nodes tracked by the ResourceManager have changed in health,
484     * availability etc.
485     */
486    void onNodesUpdated(List<NodeReport> updatedNodes);
487
488    float getProgress();
489
490    /**
491     * Called when error comes from RM communications as well as from errors in
492     * the callback itself from the app. Calling
493     * stop() is the recommended action.
494     *
495     * @param e
496     */
497    void onError(Throwable e);
498  }
499}