001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.client.api.async; 020 021import java.nio.ByteBuffer; 022import java.util.Map; 023import java.util.concurrent.ConcurrentMap; 024 025import org.apache.hadoop.classification.InterfaceAudience.Private; 026import org.apache.hadoop.classification.InterfaceAudience.Public; 027import org.apache.hadoop.classification.InterfaceStability.Stable; 028import org.apache.hadoop.service.AbstractService; 029import org.apache.hadoop.yarn.api.records.Container; 030import org.apache.hadoop.yarn.api.records.ContainerId; 031import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 032import org.apache.hadoop.yarn.api.records.ContainerStatus; 033import org.apache.hadoop.yarn.api.records.NodeId; 034import org.apache.hadoop.yarn.api.records.Token; 035import org.apache.hadoop.yarn.client.api.NMClient; 036import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; 037import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; 038import org.apache.hadoop.yarn.conf.YarnConfiguration; 039 040import com.google.common.annotations.VisibleForTesting; 041 042/** 043 * <code>NMClientAsync</code> handles communication with all the NodeManagers 044 * and provides asynchronous updates on getting responses from them. It 045 * maintains a thread pool to communicate with individual NMs where a number of 046 * worker threads process requests to NMs by using {@link NMClientImpl}. The max 047 * size of the thread pool is configurable through 048 * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}. 049 * 050 * It should be used in conjunction with a CallbackHandler. For example 051 * 052 * <pre> 053 * {@code 054 * class MyCallbackHandler implements NMClientAsync.CallbackHandler { 055 * public void onContainerStarted(ContainerId containerId, 056 * Map<String, ByteBuffer> allServiceResponse) { 057 * [post process after the container is started, process the response] 058 * } 059 * 060 * public void onContainerStatusReceived(ContainerId containerId, 061 * ContainerStatus containerStatus) { 062 * [make use of the status of the container] 063 * } 064 * 065 * public void onContainerStopped(ContainerId containerId) { 066 * [post process after the container is stopped] 067 * } 068 * 069 * public void onStartContainerError( 070 * ContainerId containerId, Throwable t) { 071 * [handle the raised exception] 072 * } 073 * 074 * public void onGetContainerStatusError( 075 * ContainerId containerId, Throwable t) { 076 * [handle the raised exception] 077 * } 078 * 079 * public void onStopContainerError( 080 * ContainerId containerId, Throwable t) { 081 * [handle the raised exception] 082 * } 083 * } 084 * } 085 * </pre> 086 * 087 * The client's life-cycle should be managed like the following: 088 * 089 * <pre> 090 * {@code 091 * NMClientAsync asyncClient = 092 * NMClientAsync.createNMClientAsync(new MyCallbackhandler()); 093 * asyncClient.init(conf); 094 * asyncClient.start(); 095 * asyncClient.startContainer(container, containerLaunchContext); 096 * [... wait for container being started] 097 * asyncClient.getContainerStatus(container.getId(), container.getNodeId(), 098 * container.getContainerToken()); 099 * [... handle the status in the callback instance] 100 * asyncClient.stopContainer(container.getId(), container.getNodeId(), 101 * container.getContainerToken()); 102 * [... wait for container being stopped] 103 * asyncClient.stop(); 104 * } 105 * </pre> 106 */ 107@Public 108@Stable 109public abstract class NMClientAsync extends AbstractService { 110 111 protected NMClient client; 112 protected CallbackHandler callbackHandler; 113 114 public static NMClientAsync createNMClientAsync( 115 CallbackHandler callbackHandler) { 116 return new NMClientAsyncImpl(callbackHandler); 117 } 118 119 protected NMClientAsync(CallbackHandler callbackHandler) { 120 this (NMClientAsync.class.getName(), callbackHandler); 121 } 122 123 protected NMClientAsync(String name, CallbackHandler callbackHandler) { 124 this (name, new NMClientImpl(), callbackHandler); 125 } 126 127 @Private 128 @VisibleForTesting 129 protected NMClientAsync(String name, NMClient client, 130 CallbackHandler callbackHandler) { 131 super(name); 132 this.setClient(client); 133 this.setCallbackHandler(callbackHandler); 134 } 135 136 public abstract void startContainerAsync( 137 Container container, ContainerLaunchContext containerLaunchContext); 138 139 public abstract void stopContainerAsync( 140 ContainerId containerId, NodeId nodeId); 141 142 public abstract void getContainerStatusAsync( 143 ContainerId containerId, NodeId nodeId); 144 145 public NMClient getClient() { 146 return client; 147 } 148 149 public void setClient(NMClient client) { 150 this.client = client; 151 } 152 153 public CallbackHandler getCallbackHandler() { 154 return callbackHandler; 155 } 156 157 public void setCallbackHandler(CallbackHandler callbackHandler) { 158 this.callbackHandler = callbackHandler; 159 } 160 161 /** 162 * <p> 163 * The callback interface needs to be implemented by {@link NMClientAsync} 164 * users. The APIs are called when responses from <code>NodeManager</code> are 165 * available. 166 * </p> 167 * 168 * <p> 169 * Once a callback happens, the users can chose to act on it in blocking or 170 * non-blocking manner. If the action on callback is done in a blocking 171 * manner, some of the threads performing requests on NodeManagers may get 172 * blocked depending on how many threads in the pool are busy. 173 * </p> 174 * 175 * <p> 176 * The implementation of the callback function should not throw the 177 * unexpected exception. Otherwise, {@link NMClientAsync} will just 178 * catch, log and then ignore it. 179 * </p> 180 */ 181 public static interface CallbackHandler { 182 /** 183 * The API is called when <code>NodeManager</code> responds to indicate its 184 * acceptance of the starting container request 185 * @param containerId the Id of the container 186 * @param allServiceResponse a Map between the auxiliary service names and 187 * their outputs 188 */ 189 void onContainerStarted(ContainerId containerId, 190 Map<String, ByteBuffer> allServiceResponse); 191 192 /** 193 * The API is called when <code>NodeManager</code> responds with the status 194 * of the container 195 * @param containerId the Id of the container 196 * @param containerStatus the status of the container 197 */ 198 void onContainerStatusReceived(ContainerId containerId, 199 ContainerStatus containerStatus); 200 201 /** 202 * The API is called when <code>NodeManager</code> responds to indicate the 203 * container is stopped. 204 * @param containerId the Id of the container 205 */ 206 void onContainerStopped(ContainerId containerId); 207 208 /** 209 * The API is called when an exception is raised in the process of 210 * starting a container 211 * 212 * @param containerId the Id of the container 213 * @param t the raised exception 214 */ 215 void onStartContainerError(ContainerId containerId, Throwable t); 216 217 /** 218 * The API is called when an exception is raised in the process of 219 * querying the status of a container 220 * 221 * @param containerId the Id of the container 222 * @param t the raised exception 223 */ 224 void onGetContainerStatusError(ContainerId containerId, Throwable t); 225 226 /** 227 * The API is called when an exception is raised in the process of 228 * stopping a container 229 * 230 * @param containerId the Id of the container 231 * @param t the raised exception 232 */ 233 void onStopContainerError(ContainerId containerId, Throwable t); 234 235 } 236 237}