001    /**
002    * Licensed to the Apache Software Foundation (ASF) under one
003    * or more contributor license agreements.  See the NOTICE file
004    * distributed with this work for additional information
005    * regarding copyright ownership.  The ASF licenses this file
006    * to you under the Apache License, Version 2.0 (the
007    * "License"); you may not use this file except in compliance
008    * with the License.  You may obtain a copy of the License at
009    *
010    *     http://www.apache.org/licenses/LICENSE-2.0
011    *
012    * Unless required by applicable law or agreed to in writing, software
013    * distributed under the License is distributed on an "AS IS" BASIS,
014    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015    * See the License for the specific language governing permissions and
016    * limitations under the License.
017    */
018    
019    package org.apache.hadoop.yarn.client;
020    
021    import java.io.IOException;
022    import java.net.InetSocketAddress;
023    import java.util.ArrayList;
024    
025    import com.google.common.base.Joiner;
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.io.Text;
032    import org.apache.hadoop.security.SecurityUtil;
033    import org.apache.hadoop.security.UserGroupInformation;
034    import org.apache.hadoop.security.token.Token;
035    import org.apache.hadoop.security.token.TokenIdentifier;
036    import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
037    import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
038    import org.apache.hadoop.yarn.conf.HAUtil;
039    import org.apache.hadoop.yarn.conf.YarnConfiguration;
040    import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
041    import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
042    
043    import com.google.common.base.Preconditions;
044    
045    @InterfaceAudience.Public
046    @InterfaceStability.Stable
047    public class ClientRMProxy<T> extends RMProxy<T>  {
048      private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
049      private static final ClientRMProxy INSTANCE = new ClientRMProxy();
050    
051      private interface ClientRMProtocols extends ApplicationClientProtocol,
052          ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
053        // Add nothing
054      }
055    
056      private ClientRMProxy(){
057        super();
058      }
059    
060      /**
061       * Create a proxy to the ResourceManager for the specified protocol.
062       * @param configuration Configuration with all the required information.
063       * @param protocol Client protocol for which proxy is being requested.
064       * @param <T> Type of proxy.
065       * @return Proxy to the ResourceManager for the specified client protocol.
066       * @throws IOException
067       */
068      public static <T> T createRMProxy(final Configuration configuration,
069          final Class<T> protocol) throws IOException {
070        return createRMProxy(configuration, protocol, INSTANCE);
071      }
072    
073      private static void setupTokens(InetSocketAddress resourceManagerAddress)
074          throws IOException {
075        // It is assumed for now that the only AMRMToken in AM's UGI is for this
076        // cluster/RM. TODO: Fix later when we have some kind of cluster-ID as
077        // default service-address, see YARN-1779.
078        for (Token<? extends TokenIdentifier> token : UserGroupInformation
079          .getCurrentUser().getTokens()) {
080          if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
081            // This token needs to be directly provided to the AMs, so set the
082            // appropriate service-name. We'll need more infrastructure when we
083            // need to set it in HA case.
084            SecurityUtil.setTokenService(token, resourceManagerAddress);
085          }
086        }
087      }
088    
089      @InterfaceAudience.Private
090      @Override
091      protected InetSocketAddress getRMAddress(YarnConfiguration conf,
092          Class<?> protocol) throws IOException {
093        if (protocol == ApplicationClientProtocol.class) {
094          return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
095              YarnConfiguration.DEFAULT_RM_ADDRESS,
096              YarnConfiguration.DEFAULT_RM_PORT);
097        } else if (protocol == ResourceManagerAdministrationProtocol.class) {
098          return conf.getSocketAddr(
099              YarnConfiguration.RM_ADMIN_ADDRESS,
100              YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
101              YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
102        } else if (protocol == ApplicationMasterProtocol.class) {
103          InetSocketAddress serviceAddr =
104              conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
105                YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
106                YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
107          setupTokens(serviceAddr);
108          return serviceAddr;
109        } else {
110          String message = "Unsupported protocol found when creating the proxy " +
111              "connection to ResourceManager: " +
112              ((protocol != null) ? protocol.getClass().getName() : "null");
113          LOG.error(message);
114          throw new IllegalStateException(message);
115        }
116      }
117    
118      @InterfaceAudience.Private
119      @Override
120      protected void checkAllowedProtocols(Class<?> protocol) {
121        Preconditions.checkArgument(
122            protocol.isAssignableFrom(ClientRMProtocols.class),
123            "RM does not support this client protocol");
124      }
125    
126      /**
127       * Get the token service name to be used for RMDelegationToken. Depending
128       * on whether HA is enabled or not, this method generates the appropriate
129       * service name as a comma-separated list of service addresses.
130       *
131       * @param conf Configuration corresponding to the cluster we need the
132       *             RMDelegationToken for
133       * @return - Service name for RMDelegationToken
134       */
135      @InterfaceStability.Unstable
136      public static Text getRMDelegationTokenService(Configuration conf) {
137        if (HAUtil.isHAEnabled(conf)) {
138          // Build a list of service addresses to form the service name
139          ArrayList<String> services = new ArrayList<String>();
140          YarnConfiguration yarnConf = new YarnConfiguration(conf);
141          for (String rmId : HAUtil.getRMHAIds(conf)) {
142            // Set RM_ID to get the corresponding RM_ADDRESS
143            yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
144            services.add(SecurityUtil.buildTokenService(
145                yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
146                    YarnConfiguration.DEFAULT_RM_ADDRESS,
147                    YarnConfiguration.DEFAULT_RM_PORT)).toString());
148          }
149          return new Text(Joiner.on(',').join(services));
150        }
151    
152        // Non-HA case - no need to set RM_ID
153        return SecurityUtil.buildTokenService(
154            conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
155                YarnConfiguration.DEFAULT_RM_ADDRESS,
156                YarnConfiguration.DEFAULT_RM_PORT));
157      }
158    }