001    /**
002    * Licensed to the Apache Software Foundation (ASF) under one
003    * or more contributor license agreements.  See the NOTICE file
004    * distributed with this work for additional information
005    * regarding copyright ownership.  The ASF licenses this file
006    * to you under the Apache License, Version 2.0 (the
007    * "License"); you may not use this file except in compliance
008    * with the License.  You may obtain a copy of the License at
009    *
010    *     http://www.apache.org/licenses/LICENSE-2.0
011    *
012    * Unless required by applicable law or agreed to in writing, software
013    * distributed under the License is distributed on an "AS IS" BASIS,
014    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015    * See the License for the specific language governing permissions and
016    * limitations under the License.
017    */
018    
019    package org.apache.hadoop.yarn.client.api.async;
020    
021    import java.io.IOException;
022    import java.util.Collection;
023    import java.util.List;
024    import java.util.concurrent.atomic.AtomicInteger;
025    
026    import org.apache.hadoop.classification.InterfaceAudience.Private;
027    import org.apache.hadoop.classification.InterfaceAudience.Public;
028    import org.apache.hadoop.classification.InterfaceStability.Stable;
029    import org.apache.hadoop.service.AbstractService;
030    import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
031    import org.apache.hadoop.yarn.api.records.Container;
032    import org.apache.hadoop.yarn.api.records.ContainerId;
033    import org.apache.hadoop.yarn.api.records.ContainerStatus;
034    import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
035    import org.apache.hadoop.yarn.api.records.NodeReport;
036    import org.apache.hadoop.yarn.api.records.Priority;
037    import org.apache.hadoop.yarn.api.records.Resource;
038    import org.apache.hadoop.yarn.client.api.AMRMClient;
039    import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
040    import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
041    import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
042    import org.apache.hadoop.yarn.exceptions.YarnException;
043    
044    import com.google.common.annotations.VisibleForTesting;
045    
046    /**
047     * <code>AMRMClientAsync</code> handles communication with the ResourceManager
048     * and provides asynchronous updates on events such as container allocations and
049     * completions.  It contains a thread that sends periodic heartbeats to the
050     * ResourceManager.
051     * 
052     * It should be used by implementing a CallbackHandler:
053     * <pre>
054     * {@code
055     * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
056     *   public void onContainersAllocated(List<Container> containers) {
057     *     [run tasks on the containers]
058     *   }
059     *   
060     *   public void onContainersCompleted(List<ContainerStatus> statuses) {
061     *     [update progress, check whether app is done]
062     *   }
063     *   
064     *   public void onNodesUpdated(List<NodeReport> updated) {}
065     *   
066     *   public void onReboot() {}
067     * }
068     * }
069     * </pre>
070     * 
071     * The client's lifecycle should be managed similarly to the following:
072     * 
073     * <pre>
074     * {@code
075     * AMRMClientAsync asyncClient = 
076     *     createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
077     * asyncClient.init(conf);
078     * asyncClient.start();
079     * RegisterApplicationMasterResponse response = asyncClient
080     *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
081     *       appMasterTrackingUrl);
082     * asyncClient.addContainerRequest(containerRequest);
083     * [... wait for application to complete]
084     * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
085     * asyncClient.stop();
086     * }
087     * </pre>
088     */
089    @Public
090    @Stable
091    public abstract class AMRMClientAsync<T extends ContainerRequest> 
092    extends AbstractService {
093      
094      protected final AMRMClient<T> client;
095      protected final CallbackHandler handler;
096      protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
097    
098      public static <T extends ContainerRequest> AMRMClientAsync<T>
099          createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
100        return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
101      }
102      
103      public static <T extends ContainerRequest> AMRMClientAsync<T>
104          createAMRMClientAsync(AMRMClient<T> client, int intervalMs,
105              CallbackHandler callbackHandler) {
106        return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
107      }
108      
109      protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
110        this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
111      }
112      
113      @Private
114      @VisibleForTesting
115      protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
116          CallbackHandler callbackHandler) {
117        super(AMRMClientAsync.class.getName());
118        this.client = client;
119        this.heartbeatIntervalMs.set(intervalMs);
120        this.handler = callbackHandler;
121      }
122        
123      public void setHeartbeatInterval(int interval) {
124        heartbeatIntervalMs.set(interval);
125      }
126      
127      public abstract List<? extends Collection<T>> getMatchingRequests(
128                                                       Priority priority, 
129                                                       String resourceName, 
130                                                       Resource capability);
131      
132      /**
133       * Registers this application master with the resource manager. On successful
134       * registration, starts the heartbeating thread.
135       * @throws YarnException
136       * @throws IOException
137       */
138      public abstract RegisterApplicationMasterResponse registerApplicationMaster(
139          String appHostName, int appHostPort, String appTrackingUrl)
140          throws YarnException, IOException;
141    
142      /**
143       * Unregister the application master. This must be called in the end.
144       * @param appStatus Success/Failure status of the master
145       * @param appMessage Diagnostics message on failure
146       * @param appTrackingUrl New URL to get master info
147       * @throws YarnException
148       * @throws IOException
149       */
150      public abstract void unregisterApplicationMaster(
151          FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) 
152      throws YarnException, IOException;
153    
154      /**
155       * Request containers for resources before calling <code>allocate</code>
156       * @param req Resource request
157       */
158      public abstract void addContainerRequest(T req);
159    
160      /**
161       * Remove previous container request. The previous container request may have 
162       * already been sent to the ResourceManager. So even after the remove request 
163       * the app must be prepared to receive an allocation for the previous request 
164       * even after the remove request
165       * @param req Resource request
166       */
167      public abstract void removeContainerRequest(T req);
168    
169      /**
170       * Release containers assigned by the Resource Manager. If the app cannot use
171       * the container or wants to give up the container then it can release them.
172       * The app needs to make new requests for the released resource capability if
173       * it still needs it. eg. it released non-local resources
174       * @param containerId
175       */
176      public abstract void releaseAssignedContainer(ContainerId containerId);
177    
178      /**
179       * Get the currently available resources in the cluster.
180       * A valid value is available after a call to allocate has been made
181       * @return Currently available resources
182       */
183      public abstract Resource getAvailableResources();
184    
185      /**
186       * Get the current number of nodes in the cluster.
187       * A valid values is available after a call to allocate has been made
188       * @return Current number of nodes in the cluster
189       */
190      public abstract int getClusterNodeCount();
191    
192      public interface CallbackHandler {
193        
194        /**
195         * Called when the ResourceManager responds to a heartbeat with completed
196         * containers. If the response contains both completed containers and
197         * allocated containers, this will be called before containersAllocated.
198         */
199        public void onContainersCompleted(List<ContainerStatus> statuses);
200        
201        /**
202         * Called when the ResourceManager responds to a heartbeat with allocated
203         * containers. If the response containers both completed containers and
204         * allocated containers, this will be called after containersCompleted.
205         */
206        public void onContainersAllocated(List<Container> containers);
207        
208        /**
209         * Called when the ResourceManager wants the ApplicationMaster to shutdown
210         * for being out of sync etc. The ApplicationMaster should not unregister
211         * with the RM unless the ApplicationMaster wants to be the last attempt.
212         */
213        public void onShutdownRequest();
214        
215        /**
216         * Called when nodes tracked by the ResourceManager have changed in health,
217         * availability etc.
218         */
219        public void onNodesUpdated(List<NodeReport> updatedNodes);
220        
221        public float getProgress();
222        
223        /**
224         * Called when error comes from RM communications as well as from errors in
225         * the callback itself from the app. Calling
226         * stop() is the recommended action.
227         *
228         * @param e
229         */
230        public void onError(Throwable e);
231      }
232    }