001/** 002* Licensed to the Apache Software Foundation (ASF) under one 003* or more contributor license agreements. See the NOTICE file 004* distributed with this work for additional information 005* regarding copyright ownership. The ASF licenses this file 006* to you under the Apache License, Version 2.0 (the 007* "License"); you may not use this file except in compliance 008* with the License. You may obtain a copy of the License at 009* 010* http://www.apache.org/licenses/LICENSE-2.0 011* 012* Unless required by applicable law or agreed to in writing, software 013* distributed under the License is distributed on an "AS IS" BASIS, 014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015* See the License for the specific language governing permissions and 016* limitations under the License. 017*/ 018 019package org.apache.hadoop.yarn.client; 020 021import java.io.IOException; 022import java.net.InetSocketAddress; 023import java.util.ArrayList; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceAudience.Private; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.classification.InterfaceStability.Unstable; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.io.Text; 033import org.apache.hadoop.security.SecurityUtil; 034import org.apache.hadoop.security.UserGroupInformation; 035import org.apache.hadoop.security.token.Token; 036import org.apache.hadoop.security.token.TokenIdentifier; 037import org.apache.hadoop.yarn.api.ApplicationClientProtocol; 038import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; 039import org.apache.hadoop.yarn.conf.HAUtil; 040import org.apache.hadoop.yarn.conf.YarnConfiguration; 041import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 042import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; 043 044import com.google.common.base.Joiner; 045import com.google.common.base.Preconditions; 046 047@InterfaceAudience.Public 048@InterfaceStability.Stable 049public class ClientRMProxy<T> extends RMProxy<T> { 050 private static final Log LOG = LogFactory.getLog(ClientRMProxy.class); 051 private static final ClientRMProxy INSTANCE = new ClientRMProxy(); 052 053 private interface ClientRMProtocols extends ApplicationClientProtocol, 054 ApplicationMasterProtocol, ResourceManagerAdministrationProtocol { 055 // Add nothing 056 } 057 058 private ClientRMProxy(){ 059 super(); 060 } 061 062 /** 063 * Create a proxy to the ResourceManager for the specified protocol. 064 * @param configuration Configuration with all the required information. 065 * @param protocol Client protocol for which proxy is being requested. 066 * @param <T> Type of proxy. 067 * @return Proxy to the ResourceManager for the specified client protocol. 068 * @throws IOException 069 */ 070 public static <T> T createRMProxy(final Configuration configuration, 071 final Class<T> protocol) throws IOException { 072 return createRMProxy(configuration, protocol, INSTANCE); 073 } 074 075 private static void setAMRMTokenService(final Configuration conf) 076 throws IOException { 077 for (Token<? extends TokenIdentifier> token : UserGroupInformation 078 .getCurrentUser().getTokens()) { 079 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { 080 token.setService(getAMRMTokenService(conf)); 081 } 082 } 083 } 084 085 @Private 086 @Override 087 protected InetSocketAddress getRMAddress(YarnConfiguration conf, 088 Class<?> protocol) throws IOException { 089 if (protocol == ApplicationClientProtocol.class) { 090 return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, 091 YarnConfiguration.DEFAULT_RM_ADDRESS, 092 YarnConfiguration.DEFAULT_RM_PORT); 093 } else if (protocol == ResourceManagerAdministrationProtocol.class) { 094 return conf.getSocketAddr( 095 YarnConfiguration.RM_ADMIN_ADDRESS, 096 YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, 097 YarnConfiguration.DEFAULT_RM_ADMIN_PORT); 098 } else if (protocol == ApplicationMasterProtocol.class) { 099 setAMRMTokenService(conf); 100 return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, 101 YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, 102 YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); 103 } else { 104 String message = "Unsupported protocol found when creating the proxy " + 105 "connection to ResourceManager: " + 106 ((protocol != null) ? protocol.getClass().getName() : "null"); 107 LOG.error(message); 108 throw new IllegalStateException(message); 109 } 110 } 111 112 @Private 113 @Override 114 protected void checkAllowedProtocols(Class<?> protocol) { 115 Preconditions.checkArgument( 116 protocol.isAssignableFrom(ClientRMProtocols.class), 117 "RM does not support this client protocol"); 118 } 119 120 /** 121 * Get the token service name to be used for RMDelegationToken. Depending 122 * on whether HA is enabled or not, this method generates the appropriate 123 * service name as a comma-separated list of service addresses. 124 * 125 * @param conf Configuration corresponding to the cluster we need the 126 * RMDelegationToken for 127 * @return - Service name for RMDelegationToken 128 */ 129 @Unstable 130 public static Text getRMDelegationTokenService(Configuration conf) { 131 return getTokenService(conf, YarnConfiguration.RM_ADDRESS, 132 YarnConfiguration.DEFAULT_RM_ADDRESS, 133 YarnConfiguration.DEFAULT_RM_PORT); 134 } 135 136 @Unstable 137 public static Text getAMRMTokenService(Configuration conf) { 138 return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS, 139 YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, 140 YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); 141 } 142 143 @Unstable 144 public static Text getTokenService(Configuration conf, String address, 145 String defaultAddr, int defaultPort) { 146 if (HAUtil.isHAEnabled(conf)) { 147 // Build a list of service addresses to form the service name 148 ArrayList<String> services = new ArrayList<String>(); 149 YarnConfiguration yarnConf = new YarnConfiguration(conf); 150 for (String rmId : HAUtil.getRMHAIds(conf)) { 151 // Set RM_ID to get the corresponding RM_ADDRESS 152 yarnConf.set(YarnConfiguration.RM_HA_ID, rmId); 153 services.add(SecurityUtil.buildTokenService( 154 yarnConf.getSocketAddr(address, defaultAddr, defaultPort)) 155 .toString()); 156 } 157 return new Text(Joiner.on(',').join(services)); 158 } 159 160 // Non-HA case - no need to set RM_ID 161 return SecurityUtil.buildTokenService(conf.getSocketAddr(address, 162 defaultAddr, defaultPort)); 163 } 164}