001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.client.api.async;
020
021import com.google.common.base.Preconditions;
022import com.google.common.base.Supplier;
023import java.io.IOException;
024import java.util.Collection;
025import java.util.List;
026import java.util.concurrent.atomic.AtomicInteger;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience.Private;
031import org.apache.hadoop.classification.InterfaceAudience.Public;
032import org.apache.hadoop.classification.InterfaceStability.Stable;
033import org.apache.hadoop.service.AbstractService;
034import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
035import org.apache.hadoop.yarn.api.records.Container;
036import org.apache.hadoop.yarn.api.records.ContainerId;
037import org.apache.hadoop.yarn.api.records.ContainerStatus;
038import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
039import org.apache.hadoop.yarn.api.records.NodeReport;
040import org.apache.hadoop.yarn.api.records.Priority;
041import org.apache.hadoop.yarn.api.records.Resource;
042import org.apache.hadoop.yarn.client.api.AMRMClient;
043import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
044import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
045import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
046import org.apache.hadoop.yarn.exceptions.YarnException;
047
048import com.google.common.annotations.VisibleForTesting;
049
050/**
051 * <code>AMRMClientAsync</code> handles communication with the ResourceManager
052 * and provides asynchronous updates on events such as container allocations and
053 * completions.  It contains a thread that sends periodic heartbeats to the
054 * ResourceManager.
055 * 
056 * It should be used by implementing a CallbackHandler:
057 * <pre>
058 * {@code
059 * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
060 *   public void onContainersAllocated(List<Container> containers) {
061 *     [run tasks on the containers]
062 *   }
063 *   
064 *   public void onContainersCompleted(List<ContainerStatus> statuses) {
065 *     [update progress, check whether app is done]
066 *   }
067 *   
068 *   public void onNodesUpdated(List<NodeReport> updated) {}
069 *   
070 *   public void onReboot() {}
071 * }
072 * }
073 * </pre>
074 * 
075 * The client's lifecycle should be managed similarly to the following:
076 * 
077 * <pre>
078 * {@code
079 * AMRMClientAsync asyncClient = 
080 *     createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
081 * asyncClient.init(conf);
082 * asyncClient.start();
083 * RegisterApplicationMasterResponse response = asyncClient
084 *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
085 *       appMasterTrackingUrl);
086 * asyncClient.addContainerRequest(containerRequest);
087 * [... wait for application to complete]
088 * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
089 * asyncClient.stop();
090 * }
091 * </pre>
092 */
093@Public
094@Stable
095public abstract class AMRMClientAsync<T extends ContainerRequest> 
096extends AbstractService {
097  private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
098  
099  protected final AMRMClient<T> client;
100  protected final CallbackHandler handler;
101  protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
102
103  public static <T extends ContainerRequest> AMRMClientAsync<T>
104      createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
105    return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
106  }
107  
108  public static <T extends ContainerRequest> AMRMClientAsync<T>
109      createAMRMClientAsync(AMRMClient<T> client, int intervalMs,
110          CallbackHandler callbackHandler) {
111    return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
112  }
113  
114  protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
115    this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
116  }
117  
118  @Private
119  @VisibleForTesting
120  protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
121      CallbackHandler callbackHandler) {
122    super(AMRMClientAsync.class.getName());
123    this.client = client;
124    this.heartbeatIntervalMs.set(intervalMs);
125    this.handler = callbackHandler;
126  }
127    
128  public void setHeartbeatInterval(int interval) {
129    heartbeatIntervalMs.set(interval);
130  }
131  
132  public abstract List<? extends Collection<T>> getMatchingRequests(
133                                                   Priority priority, 
134                                                   String resourceName, 
135                                                   Resource capability);
136  
137  /**
138   * Registers this application master with the resource manager. On successful
139   * registration, starts the heartbeating thread.
140   * @throws YarnException
141   * @throws IOException
142   */
143  public abstract RegisterApplicationMasterResponse registerApplicationMaster(
144      String appHostName, int appHostPort, String appTrackingUrl)
145      throws YarnException, IOException;
146
147  /**
148   * Unregister the application master. This must be called in the end.
149   * @param appStatus Success/Failure status of the master
150   * @param appMessage Diagnostics message on failure
151   * @param appTrackingUrl New URL to get master info
152   * @throws YarnException
153   * @throws IOException
154   */
155  public abstract void unregisterApplicationMaster(
156      FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) 
157  throws YarnException, IOException;
158
159  /**
160   * Request containers for resources before calling <code>allocate</code>
161   * @param req Resource request
162   */
163  public abstract void addContainerRequest(T req);
164
165  /**
166   * Remove previous container request. The previous container request may have 
167   * already been sent to the ResourceManager. So even after the remove request 
168   * the app must be prepared to receive an allocation for the previous request 
169   * even after the remove request
170   * @param req Resource request
171   */
172  public abstract void removeContainerRequest(T req);
173
174  /**
175   * Release containers assigned by the Resource Manager. If the app cannot use
176   * the container or wants to give up the container then it can release them.
177   * The app needs to make new requests for the released resource capability if
178   * it still needs it. eg. it released non-local resources
179   * @param containerId
180   */
181  public abstract void releaseAssignedContainer(ContainerId containerId);
182
183  /**
184   * Get the currently available resources in the cluster.
185   * A valid value is available after a call to allocate has been made
186   * @return Currently available resources
187   */
188  public abstract Resource getAvailableResources();
189
190  /**
191   * Get the current number of nodes in the cluster.
192   * A valid values is available after a call to allocate has been made
193   * @return Current number of nodes in the cluster
194   */
195  public abstract int getClusterNodeCount();
196
197  /**
198   * Update application's blacklist with addition or removal resources.
199   *
200   * @param blacklistAdditions list of resources which should be added to the
201   *        application blacklist
202   * @param blacklistRemovals list of resources which should be removed from the
203   *        application blacklist
204   */
205  public abstract void updateBlacklist(List<String> blacklistAdditions,
206                                       List<String> blacklistRemovals);
207
208  /**
209   * Wait for <code>check</code> to return true for each 1000 ms.
210   * See also {@link #waitFor(com.google.common.base.Supplier, int)}
211   * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
212   * @param check
213   */
214  public void waitFor(Supplier<Boolean> check) throws InterruptedException {
215    waitFor(check, 1000);
216  }
217
218  /**
219   * Wait for <code>check</code> to return true for each
220   * <code>checkEveryMillis</code> ms.
221   * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
222   * @param check user defined checker
223   * @param checkEveryMillis interval to call <code>check</code>
224   */
225  public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
226      throws InterruptedException {
227    waitFor(check, checkEveryMillis, 1);
228  };
229
230  /**
231   * Wait for <code>check</code> to return true for each
232   * <code>checkEveryMillis</code> ms. In the main loop, this method will log
233   * the message "waiting in main loop" for each <code>logInterval</code> times
234   * iteration to confirm the thread is alive.
235   * @param check user defined checker
236   * @param checkEveryMillis interval to call <code>check</code>
237   * @param logInterval interval to log for each
238   */
239  public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
240      int logInterval) throws InterruptedException {
241    Preconditions.checkNotNull(check, "check should not be null");
242    Preconditions.checkArgument(checkEveryMillis >= 0,
243        "checkEveryMillis should be positive value");
244    Preconditions.checkArgument(logInterval >= 0,
245        "logInterval should be positive value");
246
247    int loggingCounter = logInterval;
248    do {
249      if (LOG.isDebugEnabled()) {
250        LOG.debug("Check the condition for main loop.");
251      }
252
253      boolean result = check.get();
254      if (result) {
255        LOG.info("Exits the main loop.");
256        return;
257      }
258      if (--loggingCounter <= 0) {
259        LOG.info("Waiting in main loop.");
260        loggingCounter = logInterval;
261      }
262
263      Thread.sleep(checkEveryMillis);
264    } while (true);
265  }
266
267  public interface CallbackHandler {
268    
269    /**
270     * Called when the ResourceManager responds to a heartbeat with completed
271     * containers. If the response contains both completed containers and
272     * allocated containers, this will be called before containersAllocated.
273     */
274    public void onContainersCompleted(List<ContainerStatus> statuses);
275    
276    /**
277     * Called when the ResourceManager responds to a heartbeat with allocated
278     * containers. If the response containers both completed containers and
279     * allocated containers, this will be called after containersCompleted.
280     */
281    public void onContainersAllocated(List<Container> containers);
282    
283    /**
284     * Called when the ResourceManager wants the ApplicationMaster to shutdown
285     * for being out of sync etc. The ApplicationMaster should not unregister
286     * with the RM unless the ApplicationMaster wants to be the last attempt.
287     */
288    public void onShutdownRequest();
289    
290    /**
291     * Called when nodes tracked by the ResourceManager have changed in health,
292     * availability etc.
293     */
294    public void onNodesUpdated(List<NodeReport> updatedNodes);
295    
296    public float getProgress();
297    
298    /**
299     * Called when error comes from RM communications as well as from errors in
300     * the callback itself from the app. Calling
301     * stop() is the recommended action.
302     *
303     * @param e
304     */
305    public void onError(Throwable e);
306  }
307}