001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.client;
020
021import java.io.IOException;
022import java.net.InetSocketAddress;
023import java.util.ArrayList;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceAudience.Private;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.classification.InterfaceStability.Unstable;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.io.Text;
033import org.apache.hadoop.security.SecurityUtil;
034import org.apache.hadoop.security.UserGroupInformation;
035import org.apache.hadoop.security.token.Token;
036import org.apache.hadoop.security.token.TokenIdentifier;
037import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
038import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
039import org.apache.hadoop.yarn.conf.HAUtil;
040import org.apache.hadoop.yarn.conf.YarnConfiguration;
041import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
042import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
043
044import com.google.common.base.Joiner;
045import com.google.common.base.Preconditions;
046
047@InterfaceAudience.Public
048@InterfaceStability.Stable
049public class ClientRMProxy<T> extends RMProxy<T>  {
050  private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
051  private static final ClientRMProxy INSTANCE = new ClientRMProxy();
052
053  private interface ClientRMProtocols extends ApplicationClientProtocol,
054      ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
055    // Add nothing
056  }
057
058  private ClientRMProxy(){
059    super();
060  }
061
062  /**
063   * Create a proxy to the ResourceManager for the specified protocol.
064   * @param configuration Configuration with all the required information.
065   * @param protocol Client protocol for which proxy is being requested.
066   * @param <T> Type of proxy.
067   * @return Proxy to the ResourceManager for the specified client protocol.
068   * @throws IOException
069   */
070  public static <T> T createRMProxy(final Configuration configuration,
071      final Class<T> protocol) throws IOException {
072    return createRMProxy(configuration, protocol, INSTANCE);
073  }
074
075  private static void setAMRMTokenService(final Configuration conf)
076      throws IOException {
077    for (Token<? extends TokenIdentifier> token : UserGroupInformation
078      .getCurrentUser().getTokens()) {
079      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
080        token.setService(getAMRMTokenService(conf));
081      }
082    }
083  }
084
085  @Private
086  @Override
087  protected InetSocketAddress getRMAddress(YarnConfiguration conf,
088      Class<?> protocol) throws IOException {
089    if (protocol == ApplicationClientProtocol.class) {
090      return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
091          YarnConfiguration.DEFAULT_RM_ADDRESS,
092          YarnConfiguration.DEFAULT_RM_PORT);
093    } else if (protocol == ResourceManagerAdministrationProtocol.class) {
094      return conf.getSocketAddr(
095          YarnConfiguration.RM_ADMIN_ADDRESS,
096          YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
097          YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
098    } else if (protocol == ApplicationMasterProtocol.class) {
099      setAMRMTokenService(conf);
100      return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
101          YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
102          YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
103    } else {
104      String message = "Unsupported protocol found when creating the proxy " +
105          "connection to ResourceManager: " +
106          ((protocol != null) ? protocol.getClass().getName() : "null");
107      LOG.error(message);
108      throw new IllegalStateException(message);
109    }
110  }
111
112  @Private
113  @Override
114  protected void checkAllowedProtocols(Class<?> protocol) {
115    Preconditions.checkArgument(
116        protocol.isAssignableFrom(ClientRMProtocols.class),
117        "RM does not support this client protocol");
118  }
119
120  /**
121   * Get the token service name to be used for RMDelegationToken. Depending
122   * on whether HA is enabled or not, this method generates the appropriate
123   * service name as a comma-separated list of service addresses.
124   *
125   * @param conf Configuration corresponding to the cluster we need the
126   *             RMDelegationToken for
127   * @return - Service name for RMDelegationToken
128   */
129  @Unstable
130  public static Text getRMDelegationTokenService(Configuration conf) {
131    return getTokenService(conf, YarnConfiguration.RM_ADDRESS,
132            YarnConfiguration.DEFAULT_RM_ADDRESS,
133            YarnConfiguration.DEFAULT_RM_PORT);
134  }
135
136  @Unstable
137  public static Text getAMRMTokenService(Configuration conf) {
138    return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
139            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
140            YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
141  }
142
143  @Unstable
144  public static Text getTokenService(Configuration conf, String address,
145      String defaultAddr, int defaultPort) {
146    if (HAUtil.isHAEnabled(conf)) {
147      // Build a list of service addresses to form the service name
148      ArrayList<String> services = new ArrayList<String>();
149      YarnConfiguration yarnConf = new YarnConfiguration(conf);
150      for (String rmId : HAUtil.getRMHAIds(conf)) {
151        // Set RM_ID to get the corresponding RM_ADDRESS
152        yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
153        services.add(SecurityUtil.buildTokenService(
154            yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
155            .toString());
156      }
157      return new Text(Joiner.on(',').join(services));
158    }
159
160    // Non-HA case - no need to set RM_ID
161    return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
162      defaultAddr, defaultPort));
163  }
164}