001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.client.api;
020
021import java.util.concurrent.ConcurrentHashMap;
022
023import org.apache.hadoop.classification.InterfaceAudience.Private;
024import org.apache.hadoop.classification.InterfaceAudience.Public;
025import org.apache.hadoop.classification.InterfaceStability.Evolving;
026import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
027import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
028import org.apache.hadoop.yarn.api.records.Token;
029import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
030import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
031
032import com.google.common.annotations.VisibleForTesting;
033
034/**
035 * NMTokenCache manages NMTokens required for an Application Master
036 * communicating with individual NodeManagers.
037 * <p/>
038 * By default Yarn client libraries {@link AMRMClient} and {@link NMClient} use
039 * {@link #getSingleton()} instance of the cache.
040 * <ul>
041 * <li>Using the singleton instance of the cache is appropriate when running a
042 * single ApplicationMaster in the same JVM.</li>
043 * <li>When using the singleton, users don't need to do anything special,
044 * {@link AMRMClient} and {@link NMClient} are already set up to use the default
045 * singleton {@link NMTokenCache}</li>
046 * </ul>
047 * <p/>
048 * If running multiple Application Masters in the same JVM, a different cache
049 * instance should be used for each Application Master.
050 * <p/>
051 * <ul>
052 * <li>
053 * If using the {@link AMRMClient} and the {@link NMClient}, setting up and using
054 * an instance cache is as follows:
055 * <p/>
056 * 
057 * <pre>
058 *   NMTokenCache nmTokenCache = new NMTokenCache();
059 *   AMRMClient rmClient = AMRMClient.createAMRMClient();
060 *   NMClient nmClient = NMClient.createNMClient();
061 *   nmClient.setNMTokenCache(nmTokenCache);
062 *   ...
063 * </pre>
064 * </li>
065 * <li>
066 * If using the {@link AMRMClientAsync} and the {@link NMClientAsync}, setting up
067 * and using an instance cache is as follows:
068 * <p/>
069 * 
070 * <pre>
071 *   NMTokenCache nmTokenCache = new NMTokenCache();
072 *   AMRMClient rmClient = AMRMClient.createAMRMClient();
073 *   NMClient nmClient = NMClient.createNMClient();
074 *   nmClient.setNMTokenCache(nmTokenCache);
075 *   AMRMClientAsync rmClientAsync = new AMRMClientAsync(rmClient, 1000, [AMRM_CALLBACK]);
076 *   NMClientAsync nmClientAsync = new NMClientAsync("nmClient", nmClient, [NM_CALLBACK]);
077 *   ...
078 * </pre>
079 * </li>
080 * <li>
081 * If using {@link ApplicationMasterProtocol} and
082 * {@link ContainerManagementProtocol} directly, setting up and using an
083 * instance cache is as follows:
084 * <p/>
085 * 
086 * <pre>
087 *   NMTokenCache nmTokenCache = new NMTokenCache();
088 *   ...
089 *   ApplicationMasterProtocol amPro = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
090 *   ...
091 *   AllocateRequest allocateRequest = ...
092 *   ...
093 *   AllocateResponse allocateResponse = rmClient.allocate(allocateRequest);
094 *   for (NMToken token : allocateResponse.getNMTokens()) {
095 *     nmTokenCache.setToken(token.getNodeId().toString(), token.getToken());
096 *   }
097 *   ...
098 *   ContainerManagementProtocolProxy nmPro = ContainerManagementProtocolProxy(conf, nmTokenCache);
099 *   ...
100 *   nmPro.startContainer(container, containerContext);
101 *   ...
102 * </pre>
103 * </li>
104 * </ul>
105 * It is also possible to mix the usage of a client (<code>AMRMClient</code> or
106 * <code>NMClient</code>, or the async versions of them) with a protocol proxy (
107 * <code>ContainerManagementProtocolProxy</code> or
108 * <code>ApplicationMasterProtocol</code>).
109 */
110@Public
111@Evolving
112public class NMTokenCache {
113  private static final NMTokenCache NM_TOKEN_CACHE = new NMTokenCache();
114  
115  /**
116   * Returns the singleton NM token cache.
117   *
118   * @return the singleton NM token cache.
119   */
120  public static NMTokenCache getSingleton() {
121    return NM_TOKEN_CACHE;
122  }
123  
124  /**
125   * Returns NMToken, null if absent. Only the singleton obtained from
126   * {@link #getSingleton()} is looked at for the tokens. If you are using your
127   * own NMTokenCache that is different from the singleton, use
128   * {@link #getToken(String) }
129   * 
130   * @param nodeAddr
131   * @return {@link Token} NMToken required for communicating with node manager
132   */
133  @Public
134  public static Token getNMToken(String nodeAddr) {
135    return NM_TOKEN_CACHE.getToken(nodeAddr);
136  }
137  
138  /**
139   * Sets the NMToken for node address only in the singleton obtained from
140   * {@link #getSingleton()}. If you are using your own NMTokenCache that is
141   * different from the singleton, use {@link #setToken(String, Token) }
142   * 
143   * @param nodeAddr
144   *          node address (host:port)
145   * @param token
146   *          NMToken
147   */
148  @Public
149  public static void setNMToken(String nodeAddr, Token token) {
150    NM_TOKEN_CACHE.setToken(nodeAddr, token);
151  }
152
153  private ConcurrentHashMap<String, Token> nmTokens;
154
155  /**
156   * Creates a NM token cache instance.
157   */
158  public NMTokenCache() {
159    nmTokens = new ConcurrentHashMap<String, Token>();
160  }
161  
162  /**
163   * Returns NMToken, null if absent
164   * @param nodeAddr
165   * @return {@link Token} NMToken required for communicating with node
166   *         manager
167   */
168  @Public
169  @Evolving
170  public Token getToken(String nodeAddr) {
171    return nmTokens.get(nodeAddr);
172  }
173  
174  /**
175   * Sets the NMToken for node address
176   * @param nodeAddr node address (host:port)
177   * @param token NMToken
178   */
179  @Public
180  @Evolving
181  public void setToken(String nodeAddr, Token token) {
182    nmTokens.put(nodeAddr, token);
183  }
184  
185  /**
186   * Returns true if NMToken is present in cache.
187   */
188  @Private
189  @VisibleForTesting
190  public boolean containsToken(String nodeAddr) {
191    return nmTokens.containsKey(nodeAddr);
192  }
193  
194  /**
195   * Returns the number of NMTokens present in cache.
196   */
197  @Private
198  @VisibleForTesting
199  public int numberOfTokensInCache() {
200    return nmTokens.size();
201  }
202  
203  /**
204   * Removes NMToken for specified node manager
205   * @param nodeAddr node address (host:port)
206   */
207  @Private
208  @VisibleForTesting
209  public void removeToken(String nodeAddr) {
210    nmTokens.remove(nodeAddr);
211  }
212  
213  /**
214   * It will remove all the nm tokens from its cache
215   */
216  @Private
217  @VisibleForTesting
218  public void clearCache() {
219    nmTokens.clear();
220  }
221}