001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.client.api;
020
021import java.util.concurrent.ConcurrentHashMap;
022
023import org.apache.hadoop.classification.InterfaceAudience.Private;
024import org.apache.hadoop.classification.InterfaceAudience.Public;
025import org.apache.hadoop.classification.InterfaceStability.Evolving;
026import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
027import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
028import org.apache.hadoop.yarn.api.records.Token;
029import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
030import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
031
032import com.google.common.annotations.VisibleForTesting;
033
034/**
035 * NMTokenCache manages NMTokens required for an Application Master
036 * communicating with individual NodeManagers.
037 * <p>
038 * By default Yarn client libraries {@link AMRMClient} and {@link NMClient} use
039 * {@link #getSingleton()} instance of the cache.
040 * <ul>
041 *   <li>
042 *     Using the singleton instance of the cache is appropriate when running a
043 *     single ApplicationMaster in the same JVM.
044 *   </li>
045 *   <li>
046 *     When using the singleton, users don't need to do anything special,
047 *     {@link AMRMClient} and {@link NMClient} are already set up to use the
048 *     default singleton {@link NMTokenCache}
049 *     </li>
050 * </ul>
051 * If running multiple Application Masters in the same JVM, a different cache
052 * instance should be used for each Application Master.
053 * <ul>
054 *   <li>
055 *     If using the {@link AMRMClient} and the {@link NMClient}, setting up
056 *     and using an instance cache is as follows:
057 * <pre>
058 *   NMTokenCache nmTokenCache = new NMTokenCache();
059 *   AMRMClient rmClient = AMRMClient.createAMRMClient();
060 *   NMClient nmClient = NMClient.createNMClient();
061 *   nmClient.setNMTokenCache(nmTokenCache);
062 *   ...
063 * </pre>
064 *   </li>
065 *   <li>
066 *     If using the {@link AMRMClientAsync} and the {@link NMClientAsync},
067 *     setting up and using an instance cache is as follows:
068 * <pre>
069 *   NMTokenCache nmTokenCache = new NMTokenCache();
070 *   AMRMClient rmClient = AMRMClient.createAMRMClient();
071 *   NMClient nmClient = NMClient.createNMClient();
072 *   nmClient.setNMTokenCache(nmTokenCache);
073 *   AMRMClientAsync rmClientAsync = new AMRMClientAsync(rmClient, 1000, [AMRM_CALLBACK]);
074 *   NMClientAsync nmClientAsync = new NMClientAsync("nmClient", nmClient, [NM_CALLBACK]);
075 *   ...
076 * </pre>
077 *   </li>
078 *   <li>
079 *     If using {@link ApplicationMasterProtocol} and
080 *     {@link ContainerManagementProtocol} directly, setting up and using an
081 *     instance cache is as follows:
082 * <pre>
083 *   NMTokenCache nmTokenCache = new NMTokenCache();
084 *   ...
085 *   ApplicationMasterProtocol amPro = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
086 *   ...
087 *   AllocateRequest allocateRequest = ...
088 *   ...
089 *   AllocateResponse allocateResponse = rmClient.allocate(allocateRequest);
090 *   for (NMToken token : allocateResponse.getNMTokens()) {
091 *     nmTokenCache.setToken(token.getNodeId().toString(), token.getToken());
092 *   }
093 *   ...
094 *   ContainerManagementProtocolProxy nmPro = ContainerManagementProtocolProxy(conf, nmTokenCache);
095 *   ...
096 *   nmPro.startContainer(container, containerContext);
097 *   ...
098 * </pre>
099 *   </li>
100 * </ul>
101 * It is also possible to mix the usage of a client ({@code AMRMClient} or
102 * {@code NMClient}, or the async versions of them) with a protocol proxy
103 * ({@code ContainerManagementProtocolProxy} or
104 * {@code ApplicationMasterProtocol}).
105 */
106@Public
107@Evolving
108public class NMTokenCache {
109  private static final NMTokenCache NM_TOKEN_CACHE = new NMTokenCache();
110  
111  /**
112   * Returns the singleton NM token cache.
113   *
114   * @return the singleton NM token cache.
115   */
116  public static NMTokenCache getSingleton() {
117    return NM_TOKEN_CACHE;
118  }
119  
120  /**
121   * Returns NMToken, null if absent. Only the singleton obtained from
122   * {@link #getSingleton()} is looked at for the tokens. If you are using your
123   * own NMTokenCache that is different from the singleton, use
124   * {@link #getToken(String) }
125   * 
126   * @param nodeAddr
127   * @return {@link Token} NMToken required for communicating with node manager
128   */
129  @Public
130  public static Token getNMToken(String nodeAddr) {
131    return NM_TOKEN_CACHE.getToken(nodeAddr);
132  }
133  
134  /**
135   * Sets the NMToken for node address only in the singleton obtained from
136   * {@link #getSingleton()}. If you are using your own NMTokenCache that is
137   * different from the singleton, use {@link #setToken(String, Token) }
138   * 
139   * @param nodeAddr
140   *          node address (host:port)
141   * @param token
142   *          NMToken
143   */
144  @Public
145  public static void setNMToken(String nodeAddr, Token token) {
146    NM_TOKEN_CACHE.setToken(nodeAddr, token);
147  }
148
149  private ConcurrentHashMap<String, Token> nmTokens;
150
151  /**
152   * Creates a NM token cache instance.
153   */
154  public NMTokenCache() {
155    nmTokens = new ConcurrentHashMap<String, Token>();
156  }
157  
158  /**
159   * Returns NMToken, null if absent
160   * @param nodeAddr
161   * @return {@link Token} NMToken required for communicating with node
162   *         manager
163   */
164  @Public
165  @Evolving
166  public Token getToken(String nodeAddr) {
167    return nmTokens.get(nodeAddr);
168  }
169  
170  /**
171   * Sets the NMToken for node address
172   * @param nodeAddr node address (host:port)
173   * @param token NMToken
174   */
175  @Public
176  @Evolving
177  public void setToken(String nodeAddr, Token token) {
178    nmTokens.put(nodeAddr, token);
179  }
180  
181  /**
182   * Returns true if NMToken is present in cache.
183   */
184  @Private
185  @VisibleForTesting
186  public boolean containsToken(String nodeAddr) {
187    return nmTokens.containsKey(nodeAddr);
188  }
189  
190  /**
191   * Returns the number of NMTokens present in cache.
192   */
193  @Private
194  @VisibleForTesting
195  public int numberOfTokensInCache() {
196    return nmTokens.size();
197  }
198  
199  /**
200   * Removes NMToken for specified node manager
201   * @param nodeAddr node address (host:port)
202   */
203  @Private
204  @VisibleForTesting
205  public void removeToken(String nodeAddr) {
206    nmTokens.remove(nodeAddr);
207  }
208  
209  /**
210   * It will remove all the nm tokens from its cache
211   */
212  @Private
213  @VisibleForTesting
214  public void clearCache() {
215    nmTokens.clear();
216  }
217}