001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.client;
020
021 import java.io.IOException;
022 import java.net.InetSocketAddress;
023 import java.util.ArrayList;
024
025 import com.google.common.base.Joiner;
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.apache.hadoop.classification.InterfaceAudience;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.io.Text;
032 import org.apache.hadoop.security.SecurityUtil;
033 import org.apache.hadoop.security.UserGroupInformation;
034 import org.apache.hadoop.security.token.Token;
035 import org.apache.hadoop.security.token.TokenIdentifier;
036 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
037 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
038 import org.apache.hadoop.yarn.conf.HAUtil;
039 import org.apache.hadoop.yarn.conf.YarnConfiguration;
040 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
041 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
042
043 import com.google.common.base.Preconditions;
044
045 @InterfaceAudience.Public
046 @InterfaceStability.Stable
047 public class ClientRMProxy<T> extends RMProxy<T> {
048 private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
049 private static final ClientRMProxy INSTANCE = new ClientRMProxy();
050
051 private interface ClientRMProtocols extends ApplicationClientProtocol,
052 ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
053 // Add nothing
054 }
055
056 private ClientRMProxy(){
057 super();
058 }
059
060 /**
061 * Create a proxy to the ResourceManager for the specified protocol.
062 * @param configuration Configuration with all the required information.
063 * @param protocol Client protocol for which proxy is being requested.
064 * @param <T> Type of proxy.
065 * @return Proxy to the ResourceManager for the specified client protocol.
066 * @throws IOException
067 */
068 public static <T> T createRMProxy(final Configuration configuration,
069 final Class<T> protocol) throws IOException {
070 return createRMProxy(configuration, protocol, INSTANCE);
071 }
072
073 private static void setupTokens(InetSocketAddress resourceManagerAddress)
074 throws IOException {
075 // It is assumed for now that the only AMRMToken in AM's UGI is for this
076 // cluster/RM. TODO: Fix later when we have some kind of cluster-ID as
077 // default service-address, see YARN-1779.
078 for (Token<? extends TokenIdentifier> token : UserGroupInformation
079 .getCurrentUser().getTokens()) {
080 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
081 // This token needs to be directly provided to the AMs, so set the
082 // appropriate service-name. We'll need more infrastructure when we
083 // need to set it in HA case.
084 SecurityUtil.setTokenService(token, resourceManagerAddress);
085 }
086 }
087 }
088
089 @InterfaceAudience.Private
090 @Override
091 protected InetSocketAddress getRMAddress(YarnConfiguration conf,
092 Class<?> protocol) throws IOException {
093 if (protocol == ApplicationClientProtocol.class) {
094 return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
095 YarnConfiguration.DEFAULT_RM_ADDRESS,
096 YarnConfiguration.DEFAULT_RM_PORT);
097 } else if (protocol == ResourceManagerAdministrationProtocol.class) {
098 return conf.getSocketAddr(
099 YarnConfiguration.RM_ADMIN_ADDRESS,
100 YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
101 YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
102 } else if (protocol == ApplicationMasterProtocol.class) {
103 InetSocketAddress serviceAddr =
104 conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
105 YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
106 YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
107 setupTokens(serviceAddr);
108 return serviceAddr;
109 } else {
110 String message = "Unsupported protocol found when creating the proxy " +
111 "connection to ResourceManager: " +
112 ((protocol != null) ? protocol.getClass().getName() : "null");
113 LOG.error(message);
114 throw new IllegalStateException(message);
115 }
116 }
117
118 @InterfaceAudience.Private
119 @Override
120 protected void checkAllowedProtocols(Class<?> protocol) {
121 Preconditions.checkArgument(
122 protocol.isAssignableFrom(ClientRMProtocols.class),
123 "RM does not support this client protocol");
124 }
125
126 /**
127 * Get the token service name to be used for RMDelegationToken. Depending
128 * on whether HA is enabled or not, this method generates the appropriate
129 * service name as a comma-separated list of service addresses.
130 *
131 * @param conf Configuration corresponding to the cluster we need the
132 * RMDelegationToken for
133 * @return - Service name for RMDelegationToken
134 */
135 @InterfaceStability.Unstable
136 public static Text getRMDelegationTokenService(Configuration conf) {
137 if (HAUtil.isHAEnabled(conf)) {
138 // Build a list of service addresses to form the service name
139 ArrayList<String> services = new ArrayList<String>();
140 YarnConfiguration yarnConf = new YarnConfiguration(conf);
141 for (String rmId : HAUtil.getRMHAIds(conf)) {
142 // Set RM_ID to get the corresponding RM_ADDRESS
143 yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
144 services.add(SecurityUtil.buildTokenService(
145 yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
146 YarnConfiguration.DEFAULT_RM_ADDRESS,
147 YarnConfiguration.DEFAULT_RM_PORT)).toString());
148 }
149 return new Text(Joiner.on(',').join(services));
150 }
151
152 // Non-HA case - no need to set RM_ID
153 return SecurityUtil.buildTokenService(
154 conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
155 YarnConfiguration.DEFAULT_RM_ADDRESS,
156 YarnConfiguration.DEFAULT_RM_PORT));
157 }
158 }