001    /**
002    * Licensed to the Apache Software Foundation (ASF) under one
003    * or more contributor license agreements.  See the NOTICE file
004    * distributed with this work for additional information
005    * regarding copyright ownership.  The ASF licenses this file
006    * to you under the Apache License, Version 2.0 (the
007    * "License"); you may not use this file except in compliance
008    * with the License.  You may obtain a copy of the License at
009    *
010    *     http://www.apache.org/licenses/LICENSE-2.0
011    *
012    * Unless required by applicable law or agreed to in writing, software
013    * distributed under the License is distributed on an "AS IS" BASIS,
014    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015    * See the License for the specific language governing permissions and
016    * limitations under the License.
017    */
018    
019    package org.apache.hadoop.yarn.client.api.async;
020    
021    import com.google.common.base.Preconditions;
022    import com.google.common.base.Supplier;
023    import java.io.IOException;
024    import java.util.Collection;
025    import java.util.List;
026    import java.util.concurrent.atomic.AtomicInteger;
027    
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    import org.apache.hadoop.classification.InterfaceAudience.Private;
031    import org.apache.hadoop.classification.InterfaceAudience.Public;
032    import org.apache.hadoop.classification.InterfaceStability.Stable;
033    import org.apache.hadoop.service.AbstractService;
034    import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
035    import org.apache.hadoop.yarn.api.records.Container;
036    import org.apache.hadoop.yarn.api.records.ContainerId;
037    import org.apache.hadoop.yarn.api.records.ContainerStatus;
038    import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
039    import org.apache.hadoop.yarn.api.records.NodeReport;
040    import org.apache.hadoop.yarn.api.records.Priority;
041    import org.apache.hadoop.yarn.api.records.Resource;
042    import org.apache.hadoop.yarn.client.api.AMRMClient;
043    import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
044    import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
045    import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
046    import org.apache.hadoop.yarn.exceptions.YarnException;
047    
048    import com.google.common.annotations.VisibleForTesting;
049    
050    /**
051     * <code>AMRMClientAsync</code> handles communication with the ResourceManager
052     * and provides asynchronous updates on events such as container allocations and
053     * completions.  It contains a thread that sends periodic heartbeats to the
054     * ResourceManager.
055     * 
056     * It should be used by implementing a CallbackHandler:
057     * <pre>
058     * {@code
059     * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
060     *   public void onContainersAllocated(List<Container> containers) {
061     *     [run tasks on the containers]
062     *   }
063     *   
064     *   public void onContainersCompleted(List<ContainerStatus> statuses) {
065     *     [update progress, check whether app is done]
066     *   }
067     *   
068     *   public void onNodesUpdated(List<NodeReport> updated) {}
069     *   
070     *   public void onReboot() {}
071     * }
072     * }
073     * </pre>
074     * 
075     * The client's lifecycle should be managed similarly to the following:
076     * 
077     * <pre>
078     * {@code
079     * AMRMClientAsync asyncClient = 
080     *     createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
081     * asyncClient.init(conf);
082     * asyncClient.start();
083     * RegisterApplicationMasterResponse response = asyncClient
084     *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
085     *       appMasterTrackingUrl);
086     * asyncClient.addContainerRequest(containerRequest);
087     * [... wait for application to complete]
088     * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
089     * asyncClient.stop();
090     * }
091     * </pre>
092     */
093    @Public
094    @Stable
095    public abstract class AMRMClientAsync<T extends ContainerRequest> 
096    extends AbstractService {
097      private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
098      
099      protected final AMRMClient<T> client;
100      protected final CallbackHandler handler;
101      protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
102    
103      public static <T extends ContainerRequest> AMRMClientAsync<T>
104          createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
105        return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
106      }
107      
108      public static <T extends ContainerRequest> AMRMClientAsync<T>
109          createAMRMClientAsync(AMRMClient<T> client, int intervalMs,
110              CallbackHandler callbackHandler) {
111        return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
112      }
113      
114      protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
115        this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
116      }
117      
118      @Private
119      @VisibleForTesting
120      protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
121          CallbackHandler callbackHandler) {
122        super(AMRMClientAsync.class.getName());
123        this.client = client;
124        this.heartbeatIntervalMs.set(intervalMs);
125        this.handler = callbackHandler;
126      }
127        
128      public void setHeartbeatInterval(int interval) {
129        heartbeatIntervalMs.set(interval);
130      }
131      
132      public abstract List<? extends Collection<T>> getMatchingRequests(
133                                                       Priority priority, 
134                                                       String resourceName, 
135                                                       Resource capability);
136      
137      /**
138       * Registers this application master with the resource manager. On successful
139       * registration, starts the heartbeating thread.
140       * @throws YarnException
141       * @throws IOException
142       */
143      public abstract RegisterApplicationMasterResponse registerApplicationMaster(
144          String appHostName, int appHostPort, String appTrackingUrl)
145          throws YarnException, IOException;
146    
147      /**
148       * Unregister the application master. This must be called in the end.
149       * @param appStatus Success/Failure status of the master
150       * @param appMessage Diagnostics message on failure
151       * @param appTrackingUrl New URL to get master info
152       * @throws YarnException
153       * @throws IOException
154       */
155      public abstract void unregisterApplicationMaster(
156          FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) 
157      throws YarnException, IOException;
158    
159      /**
160       * Request containers for resources before calling <code>allocate</code>
161       * @param req Resource request
162       */
163      public abstract void addContainerRequest(T req);
164    
165      /**
166       * Remove previous container request. The previous container request may have 
167       * already been sent to the ResourceManager. So even after the remove request 
168       * the app must be prepared to receive an allocation for the previous request 
169       * even after the remove request
170       * @param req Resource request
171       */
172      public abstract void removeContainerRequest(T req);
173    
174      /**
175       * Release containers assigned by the Resource Manager. If the app cannot use
176       * the container or wants to give up the container then it can release them.
177       * The app needs to make new requests for the released resource capability if
178       * it still needs it. eg. it released non-local resources
179       * @param containerId
180       */
181      public abstract void releaseAssignedContainer(ContainerId containerId);
182    
183      /**
184       * Get the currently available resources in the cluster.
185       * A valid value is available after a call to allocate has been made
186       * @return Currently available resources
187       */
188      public abstract Resource getAvailableResources();
189    
190      /**
191       * Get the current number of nodes in the cluster.
192       * A valid values is available after a call to allocate has been made
193       * @return Current number of nodes in the cluster
194       */
195      public abstract int getClusterNodeCount();
196    
197      /**
198       * Wait for <code>check</code> to return true for each 1000 ms.
199       * See also {@link #waitFor(com.google.common.base.Supplier, int)}
200       * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
201       * @param check
202       */
203      public void waitFor(Supplier<Boolean> check) throws InterruptedException {
204        waitFor(check, 1000);
205      }
206    
207      /**
208       * Wait for <code>check</code> to return true for each
209       * <code>checkEveryMillis</code> ms.
210       * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
211       * @param check user defined checker
212       * @param checkEveryMillis interval to call <code>check</code>
213       */
214      public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
215          throws InterruptedException {
216        waitFor(check, checkEveryMillis, 1);
217      };
218    
219      /**
220       * Wait for <code>check</code> to return true for each
221       * <code>checkEveryMillis</code> ms. In the main loop, this method will log
222       * the message "waiting in main loop" for each <code>logInterval</code> times
223       * iteration to confirm the thread is alive.
224       * @param check user defined checker
225       * @param checkEveryMillis interval to call <code>check</code>
226       * @param logInterval interval to log for each
227       */
228      public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
229          int logInterval) throws InterruptedException {
230        Preconditions.checkNotNull(check, "check should not be null");
231        Preconditions.checkArgument(checkEveryMillis >= 0,
232            "checkEveryMillis should be positive value");
233        Preconditions.checkArgument(logInterval >= 0,
234            "logInterval should be positive value");
235    
236        int loggingCounter = logInterval;
237        do {
238          if (LOG.isDebugEnabled()) {
239            LOG.debug("Check the condition for main loop.");
240          }
241    
242          boolean result = check.get();
243          if (result) {
244            LOG.info("Exits the main loop.");
245            return;
246          }
247          if (--loggingCounter <= 0) {
248            LOG.info("Waiting in main loop.");
249            loggingCounter = logInterval;
250          }
251    
252          Thread.sleep(checkEveryMillis);
253        } while (true);
254      }
255    
256      public interface CallbackHandler {
257        
258        /**
259         * Called when the ResourceManager responds to a heartbeat with completed
260         * containers. If the response contains both completed containers and
261         * allocated containers, this will be called before containersAllocated.
262         */
263        public void onContainersCompleted(List<ContainerStatus> statuses);
264        
265        /**
266         * Called when the ResourceManager responds to a heartbeat with allocated
267         * containers. If the response containers both completed containers and
268         * allocated containers, this will be called after containersCompleted.
269         */
270        public void onContainersAllocated(List<Container> containers);
271        
272        /**
273         * Called when the ResourceManager wants the ApplicationMaster to shutdown
274         * for being out of sync etc. The ApplicationMaster should not unregister
275         * with the RM unless the ApplicationMaster wants to be the last attempt.
276         */
277        public void onShutdownRequest();
278        
279        /**
280         * Called when nodes tracked by the ResourceManager have changed in health,
281         * availability etc.
282         */
283        public void onNodesUpdated(List<NodeReport> updatedNodes);
284        
285        public float getProgress();
286        
287        /**
288         * Called when error comes from RM communications as well as from errors in
289         * the callback itself from the app. Calling
290         * stop() is the recommended action.
291         *
292         * @param e
293         */
294        public void onError(Throwable e);
295      }
296    }