001    /**
002    * Licensed to the Apache Software Foundation (ASF) under one
003    * or more contributor license agreements.  See the NOTICE file
004    * distributed with this work for additional information
005    * regarding copyright ownership.  The ASF licenses this file
006    * to you under the Apache License, Version 2.0 (the
007    * "License"); you may not use this file except in compliance
008    * with the License.  You may obtain a copy of the License at
009    *
010    *     http://www.apache.org/licenses/LICENSE-2.0
011    *
012    * Unless required by applicable law or agreed to in writing, software
013    * distributed under the License is distributed on an "AS IS" BASIS,
014    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015    * See the License for the specific language governing permissions and
016    * limitations under the License.
017    */
018    
019    package org.apache.hadoop.yarn.client;
020    
021    import java.io.IOException;
022    import java.net.InetSocketAddress;
023    import java.util.ArrayList;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceAudience.Private;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.classification.InterfaceStability.Unstable;
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.hadoop.io.Text;
033    import org.apache.hadoop.security.SecurityUtil;
034    import org.apache.hadoop.security.UserGroupInformation;
035    import org.apache.hadoop.security.token.Token;
036    import org.apache.hadoop.security.token.TokenIdentifier;
037    import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
038    import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
039    import org.apache.hadoop.yarn.conf.HAUtil;
040    import org.apache.hadoop.yarn.conf.YarnConfiguration;
041    import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
042    import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
043    
044    import com.google.common.base.Joiner;
045    import com.google.common.base.Preconditions;
046    
047    @InterfaceAudience.Public
048    @InterfaceStability.Stable
049    public class ClientRMProxy<T> extends RMProxy<T>  {
050      private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
051      private static final ClientRMProxy INSTANCE = new ClientRMProxy();
052    
053      private interface ClientRMProtocols extends ApplicationClientProtocol,
054          ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
055        // Add nothing
056      }
057    
058      private ClientRMProxy(){
059        super();
060      }
061    
062      /**
063       * Create a proxy to the ResourceManager for the specified protocol.
064       * @param configuration Configuration with all the required information.
065       * @param protocol Client protocol for which proxy is being requested.
066       * @param <T> Type of proxy.
067       * @return Proxy to the ResourceManager for the specified client protocol.
068       * @throws IOException
069       */
070      public static <T> T createRMProxy(final Configuration configuration,
071          final Class<T> protocol) throws IOException {
072        return createRMProxy(configuration, protocol, INSTANCE);
073      }
074    
075      private static void setAMRMTokenService(final Configuration conf)
076          throws IOException {
077        for (Token<? extends TokenIdentifier> token : UserGroupInformation
078          .getCurrentUser().getTokens()) {
079          if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
080            token.setService(getAMRMTokenService(conf));
081          }
082        }
083      }
084    
085      @Private
086      @Override
087      protected InetSocketAddress getRMAddress(YarnConfiguration conf,
088          Class<?> protocol) throws IOException {
089        if (protocol == ApplicationClientProtocol.class) {
090          return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
091              YarnConfiguration.DEFAULT_RM_ADDRESS,
092              YarnConfiguration.DEFAULT_RM_PORT);
093        } else if (protocol == ResourceManagerAdministrationProtocol.class) {
094          return conf.getSocketAddr(
095              YarnConfiguration.RM_ADMIN_ADDRESS,
096              YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
097              YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
098        } else if (protocol == ApplicationMasterProtocol.class) {
099          setAMRMTokenService(conf);
100          return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
101              YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
102              YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
103        } else {
104          String message = "Unsupported protocol found when creating the proxy " +
105              "connection to ResourceManager: " +
106              ((protocol != null) ? protocol.getClass().getName() : "null");
107          LOG.error(message);
108          throw new IllegalStateException(message);
109        }
110      }
111    
112      @Private
113      @Override
114      protected void checkAllowedProtocols(Class<?> protocol) {
115        Preconditions.checkArgument(
116            protocol.isAssignableFrom(ClientRMProtocols.class),
117            "RM does not support this client protocol");
118      }
119    
120      /**
121       * Get the token service name to be used for RMDelegationToken. Depending
122       * on whether HA is enabled or not, this method generates the appropriate
123       * service name as a comma-separated list of service addresses.
124       *
125       * @param conf Configuration corresponding to the cluster we need the
126       *             RMDelegationToken for
127       * @return - Service name for RMDelegationToken
128       */
129      @Unstable
130      public static Text getRMDelegationTokenService(Configuration conf) {
131        return getTokenService(conf, YarnConfiguration.RM_ADDRESS,
132                YarnConfiguration.DEFAULT_RM_ADDRESS,
133                YarnConfiguration.DEFAULT_RM_PORT);
134      }
135    
136      @Unstable
137      public static Text getAMRMTokenService(Configuration conf) {
138        return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
139                YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
140                YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
141      }
142    
143      @Unstable
144      public static Text getTokenService(Configuration conf, String address,
145          String defaultAddr, int defaultPort) {
146        if (HAUtil.isHAEnabled(conf)) {
147          // Build a list of service addresses to form the service name
148          ArrayList<String> services = new ArrayList<String>();
149          YarnConfiguration yarnConf = new YarnConfiguration(conf);
150          for (String rmId : HAUtil.getRMHAIds(conf)) {
151            // Set RM_ID to get the corresponding RM_ADDRESS
152            yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
153            services.add(SecurityUtil.buildTokenService(
154                yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
155                .toString());
156          }
157          return new Text(Joiner.on(',').join(services));
158        }
159    
160        // Non-HA case - no need to set RM_ID
161        return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
162          defaultAddr, defaultPort));
163      }
164    }