001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.client;
020
021 import java.io.IOException;
022 import java.net.InetSocketAddress;
023 import java.util.ArrayList;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceAudience.Private;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.classification.InterfaceStability.Unstable;
031 import org.apache.hadoop.conf.Configuration;
032 import org.apache.hadoop.io.Text;
033 import org.apache.hadoop.security.SecurityUtil;
034 import org.apache.hadoop.security.UserGroupInformation;
035 import org.apache.hadoop.security.token.Token;
036 import org.apache.hadoop.security.token.TokenIdentifier;
037 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
038 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
039 import org.apache.hadoop.yarn.conf.HAUtil;
040 import org.apache.hadoop.yarn.conf.YarnConfiguration;
041 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
042 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
043
044 import com.google.common.base.Joiner;
045 import com.google.common.base.Preconditions;
046
047 @InterfaceAudience.Public
048 @InterfaceStability.Stable
049 public class ClientRMProxy<T> extends RMProxy<T> {
050 private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
051 private static final ClientRMProxy INSTANCE = new ClientRMProxy();
052
053 private interface ClientRMProtocols extends ApplicationClientProtocol,
054 ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
055 // Add nothing
056 }
057
058 private ClientRMProxy(){
059 super();
060 }
061
062 /**
063 * Create a proxy to the ResourceManager for the specified protocol.
064 * @param configuration Configuration with all the required information.
065 * @param protocol Client protocol for which proxy is being requested.
066 * @param <T> Type of proxy.
067 * @return Proxy to the ResourceManager for the specified client protocol.
068 * @throws IOException
069 */
070 public static <T> T createRMProxy(final Configuration configuration,
071 final Class<T> protocol) throws IOException {
072 return createRMProxy(configuration, protocol, INSTANCE);
073 }
074
075 private static void setAMRMTokenService(final Configuration conf)
076 throws IOException {
077 for (Token<? extends TokenIdentifier> token : UserGroupInformation
078 .getCurrentUser().getTokens()) {
079 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
080 token.setService(getAMRMTokenService(conf));
081 }
082 }
083 }
084
085 @Private
086 @Override
087 protected InetSocketAddress getRMAddress(YarnConfiguration conf,
088 Class<?> protocol) throws IOException {
089 if (protocol == ApplicationClientProtocol.class) {
090 return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
091 YarnConfiguration.DEFAULT_RM_ADDRESS,
092 YarnConfiguration.DEFAULT_RM_PORT);
093 } else if (protocol == ResourceManagerAdministrationProtocol.class) {
094 return conf.getSocketAddr(
095 YarnConfiguration.RM_ADMIN_ADDRESS,
096 YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
097 YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
098 } else if (protocol == ApplicationMasterProtocol.class) {
099 setAMRMTokenService(conf);
100 return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
101 YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
102 YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
103 } else {
104 String message = "Unsupported protocol found when creating the proxy " +
105 "connection to ResourceManager: " +
106 ((protocol != null) ? protocol.getClass().getName() : "null");
107 LOG.error(message);
108 throw new IllegalStateException(message);
109 }
110 }
111
112 @Private
113 @Override
114 protected void checkAllowedProtocols(Class<?> protocol) {
115 Preconditions.checkArgument(
116 protocol.isAssignableFrom(ClientRMProtocols.class),
117 "RM does not support this client protocol");
118 }
119
120 /**
121 * Get the token service name to be used for RMDelegationToken. Depending
122 * on whether HA is enabled or not, this method generates the appropriate
123 * service name as a comma-separated list of service addresses.
124 *
125 * @param conf Configuration corresponding to the cluster we need the
126 * RMDelegationToken for
127 * @return - Service name for RMDelegationToken
128 */
129 @Unstable
130 public static Text getRMDelegationTokenService(Configuration conf) {
131 return getTokenService(conf, YarnConfiguration.RM_ADDRESS,
132 YarnConfiguration.DEFAULT_RM_ADDRESS,
133 YarnConfiguration.DEFAULT_RM_PORT);
134 }
135
136 @Unstable
137 public static Text getAMRMTokenService(Configuration conf) {
138 return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
139 YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
140 YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
141 }
142
143 @Unstable
144 public static Text getTokenService(Configuration conf, String address,
145 String defaultAddr, int defaultPort) {
146 if (HAUtil.isHAEnabled(conf)) {
147 // Build a list of service addresses to form the service name
148 ArrayList<String> services = new ArrayList<String>();
149 YarnConfiguration yarnConf = new YarnConfiguration(conf);
150 for (String rmId : HAUtil.getRMHAIds(conf)) {
151 // Set RM_ID to get the corresponding RM_ADDRESS
152 yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
153 services.add(SecurityUtil.buildTokenService(
154 yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
155 .toString());
156 }
157 return new Text(Joiner.on(',').join(services));
158 }
159
160 // Non-HA case - no need to set RM_ID
161 return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
162 defaultAddr, defaultPort));
163 }
164 }