001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.client.api.async;
020
021import com.google.common.base.Preconditions;
022import com.google.common.base.Supplier;
023import java.io.IOException;
024import java.util.Collection;
025import java.util.List;
026import java.util.concurrent.atomic.AtomicInteger;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience.Private;
031import org.apache.hadoop.classification.InterfaceAudience.Public;
032import org.apache.hadoop.classification.InterfaceStability.Unstable;
033import org.apache.hadoop.classification.InterfaceStability.Stable;
034import org.apache.hadoop.service.AbstractService;
035import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
036import org.apache.hadoop.yarn.api.records.Container;
037import org.apache.hadoop.yarn.api.records.ContainerId;
038import org.apache.hadoop.yarn.api.records.ContainerStatus;
039import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
040import org.apache.hadoop.yarn.api.records.NodeReport;
041import org.apache.hadoop.yarn.api.records.Priority;
042import org.apache.hadoop.yarn.api.records.Resource;
043import org.apache.hadoop.yarn.api.records.UpdatedContainer;
044import org.apache.hadoop.yarn.client.api.AMRMClient;
045import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
046import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
047import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
048import org.apache.hadoop.yarn.exceptions.YarnException;
049
050import com.google.common.annotations.VisibleForTesting;
051
052/**
053 * <code>AMRMClientAsync</code> handles communication with the ResourceManager
054 * and provides asynchronous updates on events such as container allocations and
055 * completions.  It contains a thread that sends periodic heartbeats to the
056 * ResourceManager.
057 * 
058 * It should be used by implementing a CallbackHandler:
059 * <pre>
060 * {@code
061 * class MyCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
062 *   public void onContainersAllocated(List<Container> containers) {
063 *     [run tasks on the containers]
064 *   }
065 *
066 *   public void onContainersUpdated(List<Container> containers) {
067 *     [determine if resource allocation of containers have been increased in
068 *      the ResourceManager, and if so, inform the NodeManagers to increase the
069 *      resource monitor/enforcement on the containers]
070 *   }
071 *
072 *   public void onContainersCompleted(List<ContainerStatus> statuses) {
073 *     [update progress, check whether app is done]
074 *   }
075 *   
076 *   public void onNodesUpdated(List<NodeReport> updated) {}
077 *   
078 *   public void onReboot() {}
079 * }
080 * }
081 * </pre>
082 * 
083 * The client's lifecycle should be managed similarly to the following:
084 * 
085 * <pre>
086 * {@code
087 * AMRMClientAsync asyncClient = 
088 *     createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
089 * asyncClient.init(conf);
090 * asyncClient.start();
091 * RegisterApplicationMasterResponse response = asyncClient
092 *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
093 *       appMasterTrackingUrl);
094 * asyncClient.addContainerRequest(containerRequest);
095 * [... wait for application to complete]
096 * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
097 * asyncClient.stop();
098 * }
099 * </pre>
100 */
101@Public
102@Stable
103public abstract class AMRMClientAsync<T extends ContainerRequest> 
104extends AbstractService {
105  private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
106  
107  protected final AMRMClient<T> client;
108  protected final CallbackHandler handler;
109  protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
110
111  /**
112   * <p>Create a new instance of AMRMClientAsync.</p>
113   *
114   * @param intervalMs heartbeat interval in milliseconds between AM and RM
115   * @param callbackHandler callback handler that processes responses from
116   *                        the <code>ResourceManager</code>
117   */
118  public static <T extends ContainerRequest> AMRMClientAsync<T>
119      createAMRMClientAsync(
120      int intervalMs, AbstractCallbackHandler callbackHandler) {
121    return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
122  }
123
124  /**
125   * <p>Create a new instance of AMRMClientAsync.</p>
126   *
127   * @param client the AMRMClient instance
128   * @param intervalMs heartbeat interval in milliseconds between AM and RM
129   * @param callbackHandler callback handler that processes responses from
130   *                        the <code>ResourceManager</code>
131   */
132  public static <T extends ContainerRequest> AMRMClientAsync<T>
133      createAMRMClientAsync(
134      AMRMClient<T> client, int intervalMs,
135      AbstractCallbackHandler callbackHandler) {
136    return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
137  }
138
139  protected AMRMClientAsync(
140      int intervalMs, AbstractCallbackHandler callbackHandler) {
141    this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
142  }
143
144  @Private
145  @VisibleForTesting
146  protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
147      AbstractCallbackHandler callbackHandler) {
148    super(AMRMClientAsync.class.getName());
149    this.client = client;
150    this.heartbeatIntervalMs.set(intervalMs);
151    this.handler = callbackHandler;
152  }
153
154  /**
155   *
156   * @deprecated Use {@link #createAMRMClientAsync(int,
157   *             AMRMClientAsync.AbstractCallbackHandler)} instead.
158   */
159  @Deprecated
160  public static <T extends ContainerRequest> AMRMClientAsync<T>
161      createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
162    return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
163  }
164
165  /**
166   *
167   * @deprecated Use {@link #createAMRMClientAsync(AMRMClient,
168   *             int, AMRMClientAsync.AbstractCallbackHandler)} instead.
169   */
170  @Deprecated
171  public static <T extends ContainerRequest> AMRMClientAsync<T>
172      createAMRMClientAsync(AMRMClient<T> client, int intervalMs,
173          CallbackHandler callbackHandler) {
174    return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
175  }
176
177  @Deprecated
178  protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
179    this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
180  }
181  
182  @Private
183  @VisibleForTesting
184  @Deprecated
185  protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
186      CallbackHandler callbackHandler) {
187    super(AMRMClientAsync.class.getName());
188    this.client = client;
189    this.heartbeatIntervalMs.set(intervalMs);
190    this.handler = callbackHandler;
191  }
192    
193  public void setHeartbeatInterval(int interval) {
194    heartbeatIntervalMs.set(interval);
195  }
196  
197  public abstract List<? extends Collection<T>> getMatchingRequests(
198                                                   Priority priority, 
199                                                   String resourceName, 
200                                                   Resource capability);
201  
202  /**
203   * Registers this application master with the resource manager. On successful
204   * registration, starts the heartbeating thread.
205   * @throws YarnException
206   * @throws IOException
207   */
208  public abstract RegisterApplicationMasterResponse registerApplicationMaster(
209      String appHostName, int appHostPort, String appTrackingUrl)
210      throws YarnException, IOException;
211
212  /**
213   * Unregister the application master. This must be called in the end.
214   * @param appStatus Success/Failure status of the master
215   * @param appMessage Diagnostics message on failure
216   * @param appTrackingUrl New URL to get master info
217   * @throws YarnException
218   * @throws IOException
219   */
220  public abstract void unregisterApplicationMaster(
221      FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) 
222  throws YarnException, IOException;
223
224  /**
225   * Request containers for resources before calling <code>allocate</code>
226   * @param req Resource request
227   */
228  public abstract void addContainerRequest(T req);
229
230  /**
231   * Remove previous container request. The previous container request may have 
232   * already been sent to the ResourceManager. So even after the remove request 
233   * the app must be prepared to receive an allocation for the previous request 
234   * even after the remove request
235   * @param req Resource request
236   */
237  public abstract void removeContainerRequest(T req);
238
239  /**
240   * Request container resource change before calling <code>allocate</code>.
241   * Any previous pending resource change request of the same container will be
242   * removed.
243   *
244   * Application that calls this method is expected to maintain the
245   * <code>Container</code>s that are returned from previous successful
246   * allocations or resource changes. By passing in the existing container and a
247   * target resource capability to this method, the application requests the
248   * ResourceManager to change the existing resource allocation to the target
249   * resource allocation.
250   *
251   * @param container The container returned from the last successful resource
252   *                  allocation or resource change
253   * @param capability  The target resource capability of the container
254   */
255  public abstract void requestContainerResourceChange(
256      Container container, Resource capability);
257
258  /**
259   * Release containers assigned by the Resource Manager. If the app cannot use
260   * the container or wants to give up the container then it can release them.
261   * The app needs to make new requests for the released resource capability if
262   * it still needs it. eg. it released non-local resources
263   * @param containerId
264   */
265  public abstract void releaseAssignedContainer(ContainerId containerId);
266
267  /**
268   * Get the currently available resources in the cluster.
269   * A valid value is available after a call to allocate has been made
270   * @return Currently available resources
271   */
272  public abstract Resource getAvailableResources();
273
274  /**
275   * Get the current number of nodes in the cluster.
276   * A valid values is available after a call to allocate has been made
277   * @return Current number of nodes in the cluster
278   */
279  public abstract int getClusterNodeCount();
280
281  /**
282   * Update application's blacklist with addition or removal resources.
283   *
284   * @param blacklistAdditions list of resources which should be added to the
285   *        application blacklist
286   * @param blacklistRemovals list of resources which should be removed from the
287   *        application blacklist
288   */
289  public abstract void updateBlacklist(List<String> blacklistAdditions,
290                                       List<String> blacklistRemovals);
291
292  /**
293   * Wait for <code>check</code> to return true for each 1000 ms.
294   * See also {@link #waitFor(com.google.common.base.Supplier, int)}
295   * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
296   * @param check
297   */
298  public void waitFor(Supplier<Boolean> check) throws InterruptedException {
299    waitFor(check, 1000);
300  }
301
302  /**
303   * Wait for <code>check</code> to return true for each
304   * <code>checkEveryMillis</code> ms.
305   * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
306   * @param check user defined checker
307   * @param checkEveryMillis interval to call <code>check</code>
308   */
309  public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
310      throws InterruptedException {
311    waitFor(check, checkEveryMillis, 1);
312  };
313
314  /**
315   * Wait for <code>check</code> to return true for each
316   * <code>checkEveryMillis</code> ms. In the main loop, this method will log
317   * the message "waiting in main loop" for each <code>logInterval</code> times
318   * iteration to confirm the thread is alive.
319   * @param check user defined checker
320   * @param checkEveryMillis interval to call <code>check</code>
321   * @param logInterval interval to log for each
322   */
323  public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
324      int logInterval) throws InterruptedException {
325    Preconditions.checkNotNull(check, "check should not be null");
326    Preconditions.checkArgument(checkEveryMillis >= 0,
327        "checkEveryMillis should be positive value");
328    Preconditions.checkArgument(logInterval >= 0,
329        "logInterval should be positive value");
330
331    int loggingCounter = logInterval;
332    do {
333      if (LOG.isDebugEnabled()) {
334        LOG.debug("Check the condition for main loop.");
335      }
336
337      boolean result = check.get();
338      if (result) {
339        LOG.info("Exits the main loop.");
340        return;
341      }
342      if (--loggingCounter <= 0) {
343        LOG.info("Waiting in main loop.");
344        loggingCounter = logInterval;
345      }
346
347      Thread.sleep(checkEveryMillis);
348    } while (true);
349  }
350
351  /**
352   * <p>
353   * The callback abstract class. The callback functions need to be implemented
354   * by {@link AMRMClientAsync} users. The APIs are called when responses from
355   * the <code>ResourceManager</code> are available.
356   * </p>
357   */
358  public abstract static class AbstractCallbackHandler
359      implements CallbackHandler {
360
361    /**
362     * Called when the ResourceManager responds to a heartbeat with completed
363     * containers. If the response contains both completed containers and
364     * allocated containers, this will be called before containersAllocated.
365     */
366    public abstract void onContainersCompleted(List<ContainerStatus> statuses);
367
368    /**
369     * Called when the ResourceManager responds to a heartbeat with allocated
370     * containers. If the response containers both completed containers and
371     * allocated containers, this will be called after containersCompleted.
372     */
373    public abstract void onContainersAllocated(List<Container> containers);
374
375    /**
376     * Called when the ResourceManager responds to a heartbeat with containers
377     * whose resource allocation has been changed.
378     */
379    @Public
380    @Unstable
381    public abstract void onContainersUpdated(List<UpdatedContainer> containers);
382
383    /**
384     * Called when the ResourceManager wants the ApplicationMaster to shutdown
385     * for being out of sync etc. The ApplicationMaster should not unregister
386     * with the RM unless the ApplicationMaster wants to be the last attempt.
387     */
388    public abstract void onShutdownRequest();
389
390    /**
391     * Called when nodes tracked by the ResourceManager have changed in health,
392     * availability etc.
393     */
394    public abstract void onNodesUpdated(List<NodeReport> updatedNodes);
395
396    public abstract float getProgress();
397
398    /**
399     * Called when error comes from RM communications as well as from errors in
400     * the callback itself from the app. Calling
401     * stop() is the recommended action.
402     */
403    public abstract void onError(Throwable e);
404  }
405
406  /**
407   * @deprecated Use {@link AMRMClientAsync.AbstractCallbackHandler} instead.
408   */
409  @Deprecated
410  public interface CallbackHandler {
411
412    /**
413     * Called when the ResourceManager responds to a heartbeat with completed
414     * containers. If the response contains both completed containers and
415     * allocated containers, this will be called before containersAllocated.
416     */
417    void onContainersCompleted(List<ContainerStatus> statuses);
418
419    /**
420     * Called when the ResourceManager responds to a heartbeat with allocated
421     * containers. If the response containers both completed containers and
422     * allocated containers, this will be called after containersCompleted.
423     */
424    void onContainersAllocated(List<Container> containers);
425
426    /**
427     * Called when the ResourceManager wants the ApplicationMaster to shutdown
428     * for being out of sync etc. The ApplicationMaster should not unregister
429     * with the RM unless the ApplicationMaster wants to be the last attempt.
430     */
431    void onShutdownRequest();
432
433    /**
434     * Called when nodes tracked by the ResourceManager have changed in health,
435     * availability etc.
436     */
437    void onNodesUpdated(List<NodeReport> updatedNodes);
438
439    float getProgress();
440
441    /**
442     * Called when error comes from RM communications as well as from errors in
443     * the callback itself from the app. Calling
444     * stop() is the recommended action.
445     *
446     * @param e
447     */
448    void onError(Throwable e);
449  }
450}