001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.client.api; 020 021 import java.util.concurrent.ConcurrentHashMap; 022 023 import org.apache.hadoop.classification.InterfaceAudience.Private; 024 import org.apache.hadoop.classification.InterfaceAudience.Public; 025 import org.apache.hadoop.classification.InterfaceStability.Evolving; 026 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; 027 import org.apache.hadoop.yarn.api.ContainerManagementProtocol; 028 import org.apache.hadoop.yarn.api.records.Token; 029 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; 030 import org.apache.hadoop.yarn.client.api.async.NMClientAsync; 031 032 import com.google.common.annotations.VisibleForTesting; 033 034 /** 035 * NMTokenCache manages NMTokens required for an Application Master 036 * communicating with individual NodeManagers. 037 * <p/> 038 * By default Yarn client libraries {@link AMRMClient} and {@link NMClient} use 039 * {@link #getSingleton()} instance of the cache. 040 * <ul> 041 * <li>Using the singleton instance of the cache is appropriate when running a 042 * single ApplicationMaster in the same JVM.</li> 043 * <li>When using the singleton, users don't need to do anything special, 044 * {@link AMRMClient} and {@link NMClient} are already set up to use the default 045 * singleton {@link NMTokenCache}</li> 046 * </ul> 047 * <p/> 048 * If running multiple Application Masters in the same JVM, a different cache 049 * instance should be used for each Application Master. 050 * <p/> 051 * <ul> 052 * <li> 053 * If using the {@link AMRMClient} and the {@link NMClient}, setting up and using 054 * an instance cache is as follows: 055 * <p/> 056 * 057 * <pre> 058 * NMTokenCache nmTokenCache = new NMTokenCache(); 059 * AMRMClient rmClient = AMRMClient.createAMRMClient(); 060 * NMClient nmClient = NMClient.createNMClient(); 061 * nmClient.setNMTokenCache(nmTokenCache); 062 * ... 063 * </pre> 064 * </li> 065 * <li> 066 * If using the {@link AMRMClientAsync} and the {@link NMClientAsync}, setting up 067 * and using an instance cache is as follows: 068 * <p/> 069 * 070 * <pre> 071 * NMTokenCache nmTokenCache = new NMTokenCache(); 072 * AMRMClient rmClient = AMRMClient.createAMRMClient(); 073 * NMClient nmClient = NMClient.createNMClient(); 074 * nmClient.setNMTokenCache(nmTokenCache); 075 * AMRMClientAsync rmClientAsync = new AMRMClientAsync(rmClient, 1000, [AMRM_CALLBACK]); 076 * NMClientAsync nmClientAsync = new NMClientAsync("nmClient", nmClient, [NM_CALLBACK]); 077 * ... 078 * </pre> 079 * </li> 080 * <li> 081 * If using {@link ApplicationMasterProtocol} and 082 * {@link ContainerManagementProtocol} directly, setting up and using an 083 * instance cache is as follows: 084 * <p/> 085 * 086 * <pre> 087 * NMTokenCache nmTokenCache = new NMTokenCache(); 088 * ... 089 * ApplicationMasterProtocol amPro = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class); 090 * ... 091 * AllocateRequest allocateRequest = ... 092 * ... 093 * AllocateResponse allocateResponse = rmClient.allocate(allocateRequest); 094 * for (NMToken token : allocateResponse.getNMTokens()) { 095 * nmTokenCache.setToken(token.getNodeId().toString(), token.getToken()); 096 * } 097 * ... 098 * ContainerManagementProtocolProxy nmPro = ContainerManagementProtocolProxy(conf, nmTokenCache); 099 * ... 100 * nmPro.startContainer(container, containerContext); 101 * ... 102 * </pre> 103 * </li> 104 * </ul> 105 * It is also possible to mix the usage of a client (<code>AMRMClient</code> or 106 * <code>NMClient</code>, or the async versions of them) with a protocol proxy ( 107 * <code>ContainerManagementProtocolProxy</code> or 108 * <code>ApplicationMasterProtocol</code>). 109 */ 110 @Public 111 @Evolving 112 public class NMTokenCache { 113 private static final NMTokenCache NM_TOKEN_CACHE = new NMTokenCache(); 114 115 /** 116 * Returns the singleton NM token cache. 117 * 118 * @return the singleton NM token cache. 119 */ 120 public static NMTokenCache getSingleton() { 121 return NM_TOKEN_CACHE; 122 } 123 124 /** 125 * Returns NMToken, null if absent. Only the singleton obtained from 126 * {@link #getSingleton()} is looked at for the tokens. If you are using your 127 * own NMTokenCache that is different from the singleton, use 128 * {@link #getToken(String) } 129 * 130 * @param nodeAddr 131 * @return {@link Token} NMToken required for communicating with node manager 132 */ 133 @Public 134 public static Token getNMToken(String nodeAddr) { 135 return NM_TOKEN_CACHE.getToken(nodeAddr); 136 } 137 138 /** 139 * Sets the NMToken for node address only in the singleton obtained from 140 * {@link #getSingleton()}. If you are using your own NMTokenCache that is 141 * different from the singleton, use {@link #setToken(String, Token) } 142 * 143 * @param nodeAddr 144 * node address (host:port) 145 * @param token 146 * NMToken 147 */ 148 @Public 149 public static void setNMToken(String nodeAddr, Token token) { 150 NM_TOKEN_CACHE.setToken(nodeAddr, token); 151 } 152 153 private ConcurrentHashMap<String, Token> nmTokens; 154 155 /** 156 * Creates a NM token cache instance. 157 */ 158 public NMTokenCache() { 159 nmTokens = new ConcurrentHashMap<String, Token>(); 160 } 161 162 /** 163 * Returns NMToken, null if absent 164 * @param nodeAddr 165 * @return {@link Token} NMToken required for communicating with node 166 * manager 167 */ 168 @Public 169 @Evolving 170 public Token getToken(String nodeAddr) { 171 return nmTokens.get(nodeAddr); 172 } 173 174 /** 175 * Sets the NMToken for node address 176 * @param nodeAddr node address (host:port) 177 * @param token NMToken 178 */ 179 @Public 180 @Evolving 181 public void setToken(String nodeAddr, Token token) { 182 nmTokens.put(nodeAddr, token); 183 } 184 185 /** 186 * Returns true if NMToken is present in cache. 187 */ 188 @Private 189 @VisibleForTesting 190 public boolean containsToken(String nodeAddr) { 191 return nmTokens.containsKey(nodeAddr); 192 } 193 194 /** 195 * Returns the number of NMTokens present in cache. 196 */ 197 @Private 198 @VisibleForTesting 199 public int numberOfTokensInCache() { 200 return nmTokens.size(); 201 } 202 203 /** 204 * Removes NMToken for specified node manager 205 * @param nodeAddr node address (host:port) 206 */ 207 @Private 208 @VisibleForTesting 209 public void removeToken(String nodeAddr) { 210 nmTokens.remove(nodeAddr); 211 } 212 213 /** 214 * It will remove all the nm tokens from its cache 215 */ 216 @Private 217 @VisibleForTesting 218 public void clearCache() { 219 nmTokens.clear(); 220 } 221 }