001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.client.api.async; 020 021 import java.io.IOException; 022 import java.util.Collection; 023 import java.util.List; 024 import java.util.concurrent.atomic.AtomicInteger; 025 026 import org.apache.hadoop.classification.InterfaceAudience.Private; 027 import org.apache.hadoop.classification.InterfaceAudience.Public; 028 import org.apache.hadoop.classification.InterfaceStability.Stable; 029 import org.apache.hadoop.service.AbstractService; 030 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 031 import org.apache.hadoop.yarn.api.records.Container; 032 import org.apache.hadoop.yarn.api.records.ContainerId; 033 import org.apache.hadoop.yarn.api.records.ContainerStatus; 034 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 035 import org.apache.hadoop.yarn.api.records.NodeReport; 036 import org.apache.hadoop.yarn.api.records.Priority; 037 import org.apache.hadoop.yarn.api.records.Resource; 038 import org.apache.hadoop.yarn.client.api.AMRMClient; 039 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 040 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; 041 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; 042 import org.apache.hadoop.yarn.exceptions.YarnException; 043 044 import com.google.common.annotations.VisibleForTesting; 045 046 /** 047 * <code>AMRMClientAsync</code> handles communication with the ResourceManager 048 * and provides asynchronous updates on events such as container allocations and 049 * completions. It contains a thread that sends periodic heartbeats to the 050 * ResourceManager. 051 * 052 * It should be used by implementing a CallbackHandler: 053 * <pre> 054 * {@code 055 * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler { 056 * public void onContainersAllocated(List<Container> containers) { 057 * [run tasks on the containers] 058 * } 059 * 060 * public void onContainersCompleted(List<ContainerStatus> statuses) { 061 * [update progress, check whether app is done] 062 * } 063 * 064 * public void onNodesUpdated(List<NodeReport> updated) {} 065 * 066 * public void onReboot() {} 067 * } 068 * } 069 * </pre> 070 * 071 * The client's lifecycle should be managed similarly to the following: 072 * 073 * <pre> 074 * {@code 075 * AMRMClientAsync asyncClient = 076 * createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler()); 077 * asyncClient.init(conf); 078 * asyncClient.start(); 079 * RegisterApplicationMasterResponse response = asyncClient 080 * .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 081 * appMasterTrackingUrl); 082 * asyncClient.addContainerRequest(containerRequest); 083 * [... wait for application to complete] 084 * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl); 085 * asyncClient.stop(); 086 * } 087 * </pre> 088 */ 089 @Public 090 @Stable 091 public abstract class AMRMClientAsync<T extends ContainerRequest> 092 extends AbstractService { 093 094 protected final AMRMClient<T> client; 095 protected final CallbackHandler handler; 096 protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger(); 097 098 public static <T extends ContainerRequest> AMRMClientAsync<T> 099 createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) { 100 return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler); 101 } 102 103 public static <T extends ContainerRequest> AMRMClientAsync<T> 104 createAMRMClientAsync(AMRMClient<T> client, int intervalMs, 105 CallbackHandler callbackHandler) { 106 return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler); 107 } 108 109 protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) { 110 this(new AMRMClientImpl<T>(), intervalMs, callbackHandler); 111 } 112 113 @Private 114 @VisibleForTesting 115 protected AMRMClientAsync(AMRMClient<T> client, int intervalMs, 116 CallbackHandler callbackHandler) { 117 super(AMRMClientAsync.class.getName()); 118 this.client = client; 119 this.heartbeatIntervalMs.set(intervalMs); 120 this.handler = callbackHandler; 121 } 122 123 public void setHeartbeatInterval(int interval) { 124 heartbeatIntervalMs.set(interval); 125 } 126 127 public abstract List<? extends Collection<T>> getMatchingRequests( 128 Priority priority, 129 String resourceName, 130 Resource capability); 131 132 /** 133 * Registers this application master with the resource manager. On successful 134 * registration, starts the heartbeating thread. 135 * @throws YarnException 136 * @throws IOException 137 */ 138 public abstract RegisterApplicationMasterResponse registerApplicationMaster( 139 String appHostName, int appHostPort, String appTrackingUrl) 140 throws YarnException, IOException; 141 142 /** 143 * Unregister the application master. This must be called in the end. 144 * @param appStatus Success/Failure status of the master 145 * @param appMessage Diagnostics message on failure 146 * @param appTrackingUrl New URL to get master info 147 * @throws YarnException 148 * @throws IOException 149 */ 150 public abstract void unregisterApplicationMaster( 151 FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) 152 throws YarnException, IOException; 153 154 /** 155 * Request containers for resources before calling <code>allocate</code> 156 * @param req Resource request 157 */ 158 public abstract void addContainerRequest(T req); 159 160 /** 161 * Remove previous container request. The previous container request may have 162 * already been sent to the ResourceManager. So even after the remove request 163 * the app must be prepared to receive an allocation for the previous request 164 * even after the remove request 165 * @param req Resource request 166 */ 167 public abstract void removeContainerRequest(T req); 168 169 /** 170 * Release containers assigned by the Resource Manager. If the app cannot use 171 * the container or wants to give up the container then it can release them. 172 * The app needs to make new requests for the released resource capability if 173 * it still needs it. eg. it released non-local resources 174 * @param containerId 175 */ 176 public abstract void releaseAssignedContainer(ContainerId containerId); 177 178 /** 179 * Get the currently available resources in the cluster. 180 * A valid value is available after a call to allocate has been made 181 * @return Currently available resources 182 */ 183 public abstract Resource getAvailableResources(); 184 185 /** 186 * Get the current number of nodes in the cluster. 187 * A valid values is available after a call to allocate has been made 188 * @return Current number of nodes in the cluster 189 */ 190 public abstract int getClusterNodeCount(); 191 192 public interface CallbackHandler { 193 194 /** 195 * Called when the ResourceManager responds to a heartbeat with completed 196 * containers. If the response contains both completed containers and 197 * allocated containers, this will be called before containersAllocated. 198 */ 199 public void onContainersCompleted(List<ContainerStatus> statuses); 200 201 /** 202 * Called when the ResourceManager responds to a heartbeat with allocated 203 * containers. If the response containers both completed containers and 204 * allocated containers, this will be called after containersCompleted. 205 */ 206 public void onContainersAllocated(List<Container> containers); 207 208 /** 209 * Called when the ResourceManager wants the ApplicationMaster to shutdown 210 * for being out of sync etc. The ApplicationMaster should not unregister 211 * with the RM unless the ApplicationMaster wants to be the last attempt. 212 */ 213 public void onShutdownRequest(); 214 215 /** 216 * Called when nodes tracked by the ResourceManager have changed in health, 217 * availability etc. 218 */ 219 public void onNodesUpdated(List<NodeReport> updatedNodes); 220 221 public float getProgress(); 222 223 /** 224 * Called when error comes from RM communications as well as from errors in 225 * the callback itself from the app. Calling 226 * stop() is the recommended action. 227 * 228 * @param e 229 */ 230 public void onError(Throwable e); 231 } 232 }