001    /**
002    * Licensed to the Apache Software Foundation (ASF) under one
003    * or more contributor license agreements.  See the NOTICE file
004    * distributed with this work for additional information
005    * regarding copyright ownership.  The ASF licenses this file
006    * to you under the Apache License, Version 2.0 (the
007    * "License"); you may not use this file except in compliance
008    * with the License.  You may obtain a copy of the License at
009    *
010    *     http://www.apache.org/licenses/LICENSE-2.0
011    *
012    * Unless required by applicable law or agreed to in writing, software
013    * distributed under the License is distributed on an "AS IS" BASIS,
014    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015    * See the License for the specific language governing permissions and
016    * limitations under the License.
017    */
018    
019    package org.apache.hadoop.yarn.client.api;
020    
021    import java.util.concurrent.ConcurrentHashMap;
022    
023    import org.apache.hadoop.classification.InterfaceAudience.Private;
024    import org.apache.hadoop.classification.InterfaceAudience.Public;
025    import org.apache.hadoop.classification.InterfaceStability.Evolving;
026    import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
027    import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
028    import org.apache.hadoop.yarn.api.records.Token;
029    import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
030    import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
031    
032    import com.google.common.annotations.VisibleForTesting;
033    
034    /**
035     * NMTokenCache manages NMTokens required for an Application Master
036     * communicating with individual NodeManagers.
037     * <p/>
038     * By default Yarn client libraries {@link AMRMClient} and {@link NMClient} use
039     * {@link #getSingleton()} instance of the cache.
040     * <ul>
041     * <li>Using the singleton instance of the cache is appropriate when running a
042     * single ApplicationMaster in the same JVM.</li>
043     * <li>When using the singleton, users don't need to do anything special,
044     * {@link AMRMClient} and {@link NMClient} are already set up to use the default
045     * singleton {@link NMTokenCache}</li>
046     * </ul>
047     * <p/>
048     * If running multiple Application Masters in the same JVM, a different cache
049     * instance should be used for each Application Master.
050     * <p/>
051     * <ul>
052     * <li>
053     * If using the {@link AMRMClient} and the {@link NMClient}, setting up and using
054     * an instance cache is as follows:
055     * <p/>
056     * 
057     * <pre>
058     *   NMTokenCache nmTokenCache = new NMTokenCache();
059     *   AMRMClient rmClient = AMRMClient.createAMRMClient();
060     *   NMClient nmClient = NMClient.createNMClient();
061     *   nmClient.setNMTokenCache(nmTokenCache);
062     *   ...
063     * </pre>
064     * </li>
065     * <li>
066     * If using the {@link AMRMClientAsync} and the {@link NMClientAsync}, setting up
067     * and using an instance cache is as follows:
068     * <p/>
069     * 
070     * <pre>
071     *   NMTokenCache nmTokenCache = new NMTokenCache();
072     *   AMRMClient rmClient = AMRMClient.createAMRMClient();
073     *   NMClient nmClient = NMClient.createNMClient();
074     *   nmClient.setNMTokenCache(nmTokenCache);
075     *   AMRMClientAsync rmClientAsync = new AMRMClientAsync(rmClient, 1000, [AMRM_CALLBACK]);
076     *   NMClientAsync nmClientAsync = new NMClientAsync("nmClient", nmClient, [NM_CALLBACK]);
077     *   ...
078     * </pre>
079     * </li>
080     * <li>
081     * If using {@link ApplicationMasterProtocol} and
082     * {@link ContainerManagementProtocol} directly, setting up and using an
083     * instance cache is as follows:
084     * <p/>
085     * 
086     * <pre>
087     *   NMTokenCache nmTokenCache = new NMTokenCache();
088     *   ...
089     *   ApplicationMasterProtocol amPro = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
090     *   ...
091     *   AllocateRequest allocateRequest = ...
092     *   ...
093     *   AllocateResponse allocateResponse = rmClient.allocate(allocateRequest);
094     *   for (NMToken token : allocateResponse.getNMTokens()) {
095     *     nmTokenCache.setToken(token.getNodeId().toString(), token.getToken());
096     *   }
097     *   ...
098     *   ContainerManagementProtocolProxy nmPro = ContainerManagementProtocolProxy(conf, nmTokenCache);
099     *   ...
100     *   nmPro.startContainer(container, containerContext);
101     *   ...
102     * </pre>
103     * </li>
104     * </ul>
105     * It is also possible to mix the usage of a client (<code>AMRMClient</code> or
106     * <code>NMClient</code>, or the async versions of them) with a protocol proxy (
107     * <code>ContainerManagementProtocolProxy</code> or
108     * <code>ApplicationMasterProtocol</code>).
109     */
110    @Public
111    @Evolving
112    public class NMTokenCache {
113      private static final NMTokenCache NM_TOKEN_CACHE = new NMTokenCache();
114      
115      /**
116       * Returns the singleton NM token cache.
117       *
118       * @return the singleton NM token cache.
119       */
120      public static NMTokenCache getSingleton() {
121        return NM_TOKEN_CACHE;
122      }
123      
124      /**
125       * Returns NMToken, null if absent. Only the singleton obtained from
126       * {@link #getSingleton()} is looked at for the tokens. If you are using your
127       * own NMTokenCache that is different from the singleton, use
128       * {@link #getToken(String) }
129       * 
130       * @param nodeAddr
131       * @return {@link Token} NMToken required for communicating with node manager
132       */
133      @Public
134      public static Token getNMToken(String nodeAddr) {
135        return NM_TOKEN_CACHE.getToken(nodeAddr);
136      }
137      
138      /**
139       * Sets the NMToken for node address only in the singleton obtained from
140       * {@link #getSingleton()}. If you are using your own NMTokenCache that is
141       * different from the singleton, use {@link #setToken(String, Token) }
142       * 
143       * @param nodeAddr
144       *          node address (host:port)
145       * @param token
146       *          NMToken
147       */
148      @Public
149      public static void setNMToken(String nodeAddr, Token token) {
150        NM_TOKEN_CACHE.setToken(nodeAddr, token);
151      }
152    
153      private ConcurrentHashMap<String, Token> nmTokens;
154    
155      /**
156       * Creates a NM token cache instance.
157       */
158      public NMTokenCache() {
159        nmTokens = new ConcurrentHashMap<String, Token>();
160      }
161      
162      /**
163       * Returns NMToken, null if absent
164       * @param nodeAddr
165       * @return {@link Token} NMToken required for communicating with node
166       *         manager
167       */
168      @Public
169      @Evolving
170      public Token getToken(String nodeAddr) {
171        return nmTokens.get(nodeAddr);
172      }
173      
174      /**
175       * Sets the NMToken for node address
176       * @param nodeAddr node address (host:port)
177       * @param token NMToken
178       */
179      @Public
180      @Evolving
181      public void setToken(String nodeAddr, Token token) {
182        nmTokens.put(nodeAddr, token);
183      }
184      
185      /**
186       * Returns true if NMToken is present in cache.
187       */
188      @Private
189      @VisibleForTesting
190      public boolean containsToken(String nodeAddr) {
191        return nmTokens.containsKey(nodeAddr);
192      }
193      
194      /**
195       * Returns the number of NMTokens present in cache.
196       */
197      @Private
198      @VisibleForTesting
199      public int numberOfTokensInCache() {
200        return nmTokens.size();
201      }
202      
203      /**
204       * Removes NMToken for specified node manager
205       * @param nodeAddr node address (host:port)
206       */
207      @Private
208      @VisibleForTesting
209      public void removeToken(String nodeAddr) {
210        nmTokens.remove(nodeAddr);
211      }
212      
213      /**
214       * It will remove all the nm tokens from its cache
215       */
216      @Private
217      @VisibleForTesting
218      public void clearCache() {
219        nmTokens.clear();
220      }
221    }