001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.client.api.async;
020
021 import java.io.IOException;
022 import java.util.Collection;
023 import java.util.List;
024 import java.util.concurrent.atomic.AtomicInteger;
025
026 import org.apache.hadoop.classification.InterfaceAudience.Private;
027 import org.apache.hadoop.classification.InterfaceAudience.Public;
028 import org.apache.hadoop.classification.InterfaceStability.Stable;
029 import org.apache.hadoop.service.AbstractService;
030 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
031 import org.apache.hadoop.yarn.api.records.Container;
032 import org.apache.hadoop.yarn.api.records.ContainerId;
033 import org.apache.hadoop.yarn.api.records.ContainerStatus;
034 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
035 import org.apache.hadoop.yarn.api.records.NodeReport;
036 import org.apache.hadoop.yarn.api.records.Priority;
037 import org.apache.hadoop.yarn.api.records.Resource;
038 import org.apache.hadoop.yarn.client.api.AMRMClient;
039 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
040 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
041 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
042 import org.apache.hadoop.yarn.exceptions.YarnException;
043
044 import com.google.common.annotations.VisibleForTesting;
045
046 /**
047 * <code>AMRMClientAsync</code> handles communication with the ResourceManager
048 * and provides asynchronous updates on events such as container allocations and
049 * completions. It contains a thread that sends periodic heartbeats to the
050 * ResourceManager.
051 *
052 * It should be used by implementing a CallbackHandler:
053 * <pre>
054 * {@code
055 * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
056 * public void onContainersAllocated(List<Container> containers) {
057 * [run tasks on the containers]
058 * }
059 *
060 * public void onContainersCompleted(List<ContainerStatus> statuses) {
061 * [update progress, check whether app is done]
062 * }
063 *
064 * public void onNodesUpdated(List<NodeReport> updated) {}
065 *
066 * public void onReboot() {}
067 * }
068 * }
069 * </pre>
070 *
071 * The client's lifecycle should be managed similarly to the following:
072 *
073 * <pre>
074 * {@code
075 * AMRMClientAsync asyncClient =
076 * createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
077 * asyncClient.init(conf);
078 * asyncClient.start();
079 * RegisterApplicationMasterResponse response = asyncClient
080 * .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
081 * appMasterTrackingUrl);
082 * asyncClient.addContainerRequest(containerRequest);
083 * [... wait for application to complete]
084 * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
085 * asyncClient.stop();
086 * }
087 * </pre>
088 */
089 @Public
090 @Stable
091 public abstract class AMRMClientAsync<T extends ContainerRequest>
092 extends AbstractService {
093
094 protected final AMRMClient<T> client;
095 protected final CallbackHandler handler;
096 protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
097
098 public static <T extends ContainerRequest> AMRMClientAsync<T>
099 createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
100 return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
101 }
102
103 public static <T extends ContainerRequest> AMRMClientAsync<T>
104 createAMRMClientAsync(AMRMClient<T> client, int intervalMs,
105 CallbackHandler callbackHandler) {
106 return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
107 }
108
109 protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
110 this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
111 }
112
113 @Private
114 @VisibleForTesting
115 protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
116 CallbackHandler callbackHandler) {
117 super(AMRMClientAsync.class.getName());
118 this.client = client;
119 this.heartbeatIntervalMs.set(intervalMs);
120 this.handler = callbackHandler;
121 }
122
123 public void setHeartbeatInterval(int interval) {
124 heartbeatIntervalMs.set(interval);
125 }
126
127 public abstract List<? extends Collection<T>> getMatchingRequests(
128 Priority priority,
129 String resourceName,
130 Resource capability);
131
132 /**
133 * Registers this application master with the resource manager. On successful
134 * registration, starts the heartbeating thread.
135 * @throws YarnException
136 * @throws IOException
137 */
138 public abstract RegisterApplicationMasterResponse registerApplicationMaster(
139 String appHostName, int appHostPort, String appTrackingUrl)
140 throws YarnException, IOException;
141
142 /**
143 * Unregister the application master. This must be called in the end.
144 * @param appStatus Success/Failure status of the master
145 * @param appMessage Diagnostics message on failure
146 * @param appTrackingUrl New URL to get master info
147 * @throws YarnException
148 * @throws IOException
149 */
150 public abstract void unregisterApplicationMaster(
151 FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl)
152 throws YarnException, IOException;
153
154 /**
155 * Request containers for resources before calling <code>allocate</code>
156 * @param req Resource request
157 */
158 public abstract void addContainerRequest(T req);
159
160 /**
161 * Remove previous container request. The previous container request may have
162 * already been sent to the ResourceManager. So even after the remove request
163 * the app must be prepared to receive an allocation for the previous request
164 * even after the remove request
165 * @param req Resource request
166 */
167 public abstract void removeContainerRequest(T req);
168
169 /**
170 * Release containers assigned by the Resource Manager. If the app cannot use
171 * the container or wants to give up the container then it can release them.
172 * The app needs to make new requests for the released resource capability if
173 * it still needs it. eg. it released non-local resources
174 * @param containerId
175 */
176 public abstract void releaseAssignedContainer(ContainerId containerId);
177
178 /**
179 * Get the currently available resources in the cluster.
180 * A valid value is available after a call to allocate has been made
181 * @return Currently available resources
182 */
183 public abstract Resource getAvailableResources();
184
185 /**
186 * Get the current number of nodes in the cluster.
187 * A valid values is available after a call to allocate has been made
188 * @return Current number of nodes in the cluster
189 */
190 public abstract int getClusterNodeCount();
191
192 public interface CallbackHandler {
193
194 /**
195 * Called when the ResourceManager responds to a heartbeat with completed
196 * containers. If the response contains both completed containers and
197 * allocated containers, this will be called before containersAllocated.
198 */
199 public void onContainersCompleted(List<ContainerStatus> statuses);
200
201 /**
202 * Called when the ResourceManager responds to a heartbeat with allocated
203 * containers. If the response containers both completed containers and
204 * allocated containers, this will be called after containersCompleted.
205 */
206 public void onContainersAllocated(List<Container> containers);
207
208 /**
209 * Called when the ResourceManager wants the ApplicationMaster to shutdown
210 * for being out of sync etc. The ApplicationMaster should not unregister
211 * with the RM unless the ApplicationMaster wants to be the last attempt.
212 */
213 public void onShutdownRequest();
214
215 /**
216 * Called when nodes tracked by the ResourceManager have changed in health,
217 * availability etc.
218 */
219 public void onNodesUpdated(List<NodeReport> updatedNodes);
220
221 public float getProgress();
222
223 /**
224 * Called when error comes from RM communications as well as from errors in
225 * the callback itself from the app. Calling
226 * stop() is the recommended action.
227 *
228 * @param e
229 */
230 public void onError(Throwable e);
231 }
232 }