001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.client; 020 021 import java.io.IOException; 022 import java.net.InetSocketAddress; 023 import java.util.ArrayList; 024 025 import com.google.common.base.Joiner; 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.apache.hadoop.classification.InterfaceAudience; 029 import org.apache.hadoop.classification.InterfaceStability; 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.io.Text; 032 import org.apache.hadoop.security.SecurityUtil; 033 import org.apache.hadoop.security.UserGroupInformation; 034 import org.apache.hadoop.security.token.Token; 035 import org.apache.hadoop.security.token.TokenIdentifier; 036 import org.apache.hadoop.yarn.api.ApplicationClientProtocol; 037 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; 038 import org.apache.hadoop.yarn.conf.HAUtil; 039 import org.apache.hadoop.yarn.conf.YarnConfiguration; 040 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 041 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; 042 043 import com.google.common.base.Preconditions; 044 045 @InterfaceAudience.Public 046 @InterfaceStability.Stable 047 public class ClientRMProxy<T> extends RMProxy<T> { 048 private static final Log LOG = LogFactory.getLog(ClientRMProxy.class); 049 private static final ClientRMProxy INSTANCE = new ClientRMProxy(); 050 051 private interface ClientRMProtocols extends ApplicationClientProtocol, 052 ApplicationMasterProtocol, ResourceManagerAdministrationProtocol { 053 // Add nothing 054 } 055 056 private ClientRMProxy(){ 057 super(); 058 } 059 060 /** 061 * Create a proxy to the ResourceManager for the specified protocol. 062 * @param configuration Configuration with all the required information. 063 * @param protocol Client protocol for which proxy is being requested. 064 * @param <T> Type of proxy. 065 * @return Proxy to the ResourceManager for the specified client protocol. 066 * @throws IOException 067 */ 068 public static <T> T createRMProxy(final Configuration configuration, 069 final Class<T> protocol) throws IOException { 070 return createRMProxy(configuration, protocol, INSTANCE); 071 } 072 073 private static void setupTokens(InetSocketAddress resourceManagerAddress) 074 throws IOException { 075 // It is assumed for now that the only AMRMToken in AM's UGI is for this 076 // cluster/RM. TODO: Fix later when we have some kind of cluster-ID as 077 // default service-address, see YARN-1779. 078 for (Token<? extends TokenIdentifier> token : UserGroupInformation 079 .getCurrentUser().getTokens()) { 080 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { 081 // This token needs to be directly provided to the AMs, so set the 082 // appropriate service-name. We'll need more infrastructure when we 083 // need to set it in HA case. 084 SecurityUtil.setTokenService(token, resourceManagerAddress); 085 } 086 } 087 } 088 089 @InterfaceAudience.Private 090 @Override 091 protected InetSocketAddress getRMAddress(YarnConfiguration conf, 092 Class<?> protocol) throws IOException { 093 if (protocol == ApplicationClientProtocol.class) { 094 return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, 095 YarnConfiguration.DEFAULT_RM_ADDRESS, 096 YarnConfiguration.DEFAULT_RM_PORT); 097 } else if (protocol == ResourceManagerAdministrationProtocol.class) { 098 return conf.getSocketAddr( 099 YarnConfiguration.RM_ADMIN_ADDRESS, 100 YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, 101 YarnConfiguration.DEFAULT_RM_ADMIN_PORT); 102 } else if (protocol == ApplicationMasterProtocol.class) { 103 InetSocketAddress serviceAddr = 104 conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, 105 YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, 106 YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); 107 setupTokens(serviceAddr); 108 return serviceAddr; 109 } else { 110 String message = "Unsupported protocol found when creating the proxy " + 111 "connection to ResourceManager: " + 112 ((protocol != null) ? protocol.getClass().getName() : "null"); 113 LOG.error(message); 114 throw new IllegalStateException(message); 115 } 116 } 117 118 @InterfaceAudience.Private 119 @Override 120 protected void checkAllowedProtocols(Class<?> protocol) { 121 Preconditions.checkArgument( 122 protocol.isAssignableFrom(ClientRMProtocols.class), 123 "RM does not support this client protocol"); 124 } 125 126 /** 127 * Get the token service name to be used for RMDelegationToken. Depending 128 * on whether HA is enabled or not, this method generates the appropriate 129 * service name as a comma-separated list of service addresses. 130 * 131 * @param conf Configuration corresponding to the cluster we need the 132 * RMDelegationToken for 133 * @return - Service name for RMDelegationToken 134 */ 135 @InterfaceStability.Unstable 136 public static Text getRMDelegationTokenService(Configuration conf) { 137 if (HAUtil.isHAEnabled(conf)) { 138 // Build a list of service addresses to form the service name 139 ArrayList<String> services = new ArrayList<String>(); 140 YarnConfiguration yarnConf = new YarnConfiguration(conf); 141 for (String rmId : HAUtil.getRMHAIds(conf)) { 142 // Set RM_ID to get the corresponding RM_ADDRESS 143 yarnConf.set(YarnConfiguration.RM_HA_ID, rmId); 144 services.add(SecurityUtil.buildTokenService( 145 yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS, 146 YarnConfiguration.DEFAULT_RM_ADDRESS, 147 YarnConfiguration.DEFAULT_RM_PORT)).toString()); 148 } 149 return new Text(Joiner.on(',').join(services)); 150 } 151 152 // Non-HA case - no need to set RM_ID 153 return SecurityUtil.buildTokenService( 154 conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, 155 YarnConfiguration.DEFAULT_RM_ADDRESS, 156 YarnConfiguration.DEFAULT_RM_PORT)); 157 } 158 }