001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs; 019 020import java.io.IOException; 021import java.lang.reflect.Constructor; 022import java.lang.reflect.InvocationHandler; 023import java.lang.reflect.Proxy; 024import java.net.InetSocketAddress; 025import java.net.URI; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.concurrent.atomic.AtomicBoolean; 029 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import com.google.common.annotations.VisibleForTesting; 035import com.google.common.base.Preconditions; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; 039import org.apache.hadoop.hdfs.client.impl.DfsClientConf; 040import org.apache.hadoop.hdfs.protocol.ClientProtocol; 041import org.apache.hadoop.hdfs.protocol.HdfsConstants; 042import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; 043import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; 044import org.apache.hadoop.hdfs.server.namenode.SafeModeException; 045import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; 046import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider; 047import org.apache.hadoop.io.Text; 048import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider; 049import org.apache.hadoop.io.retry.FailoverProxyProvider; 050import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; 051import org.apache.hadoop.io.retry.RetryPolicies; 052import org.apache.hadoop.io.retry.RetryPolicy; 053import org.apache.hadoop.io.retry.RetryProxy; 054import org.apache.hadoop.io.retry.RetryUtils; 055import org.apache.hadoop.ipc.ProtobufRpcEngine; 056import org.apache.hadoop.ipc.RPC; 057import org.apache.hadoop.net.NetUtils; 058import org.apache.hadoop.security.SecurityUtil; 059import org.apache.hadoop.security.UserGroupInformation; 060 061/** 062 * Create proxy objects with {@link ClientProtocol} to communicate with a remote 063 * NN. Generally use {@link NameNodeProxiesClient#createProxyWithClientProtocol( 064 * Configuration, URI, AtomicBoolean)}, which will create either an HA- or 065 * non-HA-enabled client proxy as appropriate. 066 * 067 * For creating proxy objects with other protocols, please see 068 * {@link NameNodeProxies#createProxy(Configuration, URI, Class)}. 069 */ 070@InterfaceAudience.Private 071public class NameNodeProxiesClient { 072 073 private static final Logger LOG = LoggerFactory.getLogger( 074 NameNodeProxiesClient.class); 075 076 /** 077 * Wrapper for a client proxy as well as its associated service ID. 078 * This is simply used as a tuple-like return type for created NN proxy. 079 */ 080 public static class ProxyAndInfo<PROXYTYPE> { 081 private final PROXYTYPE proxy; 082 private final Text dtService; 083 private final InetSocketAddress address; 084 085 public ProxyAndInfo(PROXYTYPE proxy, Text dtService, 086 InetSocketAddress address) { 087 this.proxy = proxy; 088 this.dtService = dtService; 089 this.address = address; 090 } 091 092 public PROXYTYPE getProxy() { 093 return proxy; 094 } 095 096 public Text getDelegationTokenService() { 097 return dtService; 098 } 099 100 public InetSocketAddress getAddress() { 101 return address; 102 } 103 } 104 105 /** 106 * Creates the namenode proxy with the ClientProtocol. This will handle 107 * creation of either HA- or non-HA-enabled proxy objects, depending upon 108 * if the provided URI is a configured logical URI. 109 * 110 * @param conf the configuration containing the required IPC 111 * properties, client failover configurations, etc. 112 * @param nameNodeUri the URI pointing either to a specific NameNode 113 * or to a logical nameservice. 114 * @param fallbackToSimpleAuth set to true or false during calls to indicate 115 * if a secure client falls back to simple auth 116 * @return an object containing both the proxy and the associated 117 * delegation token service it corresponds to 118 * @throws IOException if there is an error creating the proxy 119 * @see {@link NameNodeProxies#createProxy(Configuration, URI, Class)}. 120 */ 121 public static ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol( 122 Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth) 123 throws IOException { 124 AbstractNNFailoverProxyProvider<ClientProtocol> failoverProxyProvider = 125 createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, 126 true, fallbackToSimpleAuth); 127 128 if (failoverProxyProvider == null) { 129 InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri); 130 Text dtService = SecurityUtil.buildTokenService(nnAddr); 131 ClientProtocol proxy = createNonHAProxyWithClientProtocol(nnAddr, conf, 132 UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); 133 return new ProxyAndInfo<>(proxy, dtService, nnAddr); 134 } else { 135 return createHAProxy(conf, nameNodeUri, ClientProtocol.class, 136 failoverProxyProvider); 137 } 138 } 139 140 /** 141 * Generate a dummy namenode proxy instance that utilizes our hacked 142 * {@link LossyRetryInvocationHandler}. Proxy instance generated using this 143 * method will proactively drop RPC responses. Currently this method only 144 * support HA setup. null will be returned if the given configuration is not 145 * for HA. 146 * 147 * @param config the configuration containing the required IPC 148 * properties, client failover configurations, etc. 149 * @param nameNodeUri the URI pointing either to a specific NameNode 150 * or to a logical nameservice. 151 * @param xface the IPC interface which should be created 152 * @param numResponseToDrop The number of responses to drop for each RPC call 153 * @param fallbackToSimpleAuth set to true or false during calls to indicate 154 * if a secure client falls back to simple auth 155 * @return an object containing both the proxy and the associated 156 * delegation token service it corresponds to. Will return null of the 157 * given configuration does not support HA. 158 * @throws IOException if there is an error creating the proxy 159 */ 160 public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler( 161 Configuration config, URI nameNodeUri, Class<T> xface, 162 int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth) 163 throws IOException { 164 Preconditions.checkArgument(numResponseToDrop > 0); 165 AbstractNNFailoverProxyProvider<T> failoverProxyProvider = 166 createFailoverProxyProvider(config, nameNodeUri, xface, true, 167 fallbackToSimpleAuth); 168 169 if (failoverProxyProvider != null) { // HA case 170 int delay = config.getInt( 171 HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, 172 HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT); 173 int maxCap = config.getInt( 174 HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY, 175 HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT); 176 int maxFailoverAttempts = config.getInt( 177 HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, 178 HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); 179 int maxRetryAttempts = config.getInt( 180 HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, 181 HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT); 182 InvocationHandler dummyHandler = new LossyRetryInvocationHandler<>( 183 numResponseToDrop, failoverProxyProvider, 184 RetryPolicies.failoverOnNetworkException( 185 RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, 186 Math.max(numResponseToDrop + 1, maxRetryAttempts), delay, 187 maxCap)); 188 189 @SuppressWarnings("unchecked") 190 T proxy = (T) Proxy.newProxyInstance( 191 failoverProxyProvider.getInterface().getClassLoader(), 192 new Class[]{xface}, dummyHandler); 193 Text dtService; 194 if (failoverProxyProvider.useLogicalURI()) { 195 dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri, 196 HdfsConstants.HDFS_URI_SCHEME); 197 } else { 198 dtService = SecurityUtil.buildTokenService( 199 DFSUtilClient.getNNAddress(nameNodeUri)); 200 } 201 return new ProxyAndInfo<>(proxy, dtService, 202 DFSUtilClient.getNNAddress(nameNodeUri)); 203 } else { 204 LOG.warn("Currently creating proxy using " + 205 "LossyRetryInvocationHandler requires NN HA setup"); 206 return null; 207 } 208 } 209 210 /** Creates the Failover proxy provider instance*/ 211 @VisibleForTesting 212 public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider( 213 Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort, 214 AtomicBoolean fallbackToSimpleAuth) throws IOException { 215 Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null; 216 AbstractNNFailoverProxyProvider<T> providerNN; 217 try { 218 // Obtain the class of the proxy provider 219 failoverProxyProviderClass = getFailoverProxyProviderClass(conf, 220 nameNodeUri); 221 if (failoverProxyProviderClass == null) { 222 return null; 223 } 224 // Create a proxy provider instance. 225 Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass 226 .getConstructor(Configuration.class, URI.class, Class.class); 227 FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri, 228 xface); 229 230 // If the proxy provider is of an old implementation, wrap it. 231 if (!(provider instanceof AbstractNNFailoverProxyProvider)) { 232 providerNN = new WrappedFailoverProxyProvider<>(provider); 233 } else { 234 providerNN = (AbstractNNFailoverProxyProvider<T>)provider; 235 } 236 } catch (Exception e) { 237 final String message = "Couldn't create proxy provider " + 238 failoverProxyProviderClass; 239 LOG.debug(message, e); 240 if (e.getCause() instanceof IOException) { 241 throw (IOException) e.getCause(); 242 } else { 243 throw new IOException(message, e); 244 } 245 } 246 247 // Check the port in the URI, if it is logical. 248 if (checkPort && providerNN.useLogicalURI()) { 249 int port = nameNodeUri.getPort(); 250 if (port > 0 && 251 port != HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT) { 252 // Throwing here without any cleanup is fine since we have not 253 // actually created the underlying proxies yet. 254 throw new IOException("Port " + port + " specified in URI " 255 + nameNodeUri + " but host '" + nameNodeUri.getHost() 256 + "' is a logical (HA) namenode" 257 + " and does not use port information."); 258 } 259 } 260 providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth); 261 return providerNN; 262 } 263 264 /** Gets the configured Failover proxy provider's class */ 265 @VisibleForTesting 266 public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass( 267 Configuration conf, URI nameNodeUri) throws IOException { 268 if (nameNodeUri == null) { 269 return null; 270 } 271 String host = nameNodeUri.getHost(); 272 String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX 273 + "." + host; 274 try { 275 @SuppressWarnings("unchecked") 276 Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) 277 conf.getClass(configKey, null, FailoverProxyProvider.class); 278 return ret; 279 } catch (RuntimeException e) { 280 if (e.getCause() instanceof ClassNotFoundException) { 281 throw new IOException("Could not load failover proxy provider class " 282 + conf.get(configKey) + " which is configured for authority " 283 + nameNodeUri, e); 284 } else { 285 throw e; 286 } 287 } 288 } 289 290 /** 291 * Creates an explicitly HA-enabled proxy object. 292 * 293 * @param conf the configuration object 294 * @param nameNodeUri the URI pointing either to a specific NameNode or to a 295 * logical nameservice. 296 * @param xface the IPC interface which should be created 297 * @param failoverProxyProvider Failover proxy provider 298 * @return an object containing both the proxy and the associated 299 * delegation token service it corresponds to 300 */ 301 @SuppressWarnings("unchecked") 302 public static <T> ProxyAndInfo<T> createHAProxy( 303 Configuration conf, URI nameNodeUri, Class<T> xface, 304 AbstractNNFailoverProxyProvider<T> failoverProxyProvider) { 305 Preconditions.checkNotNull(failoverProxyProvider); 306 // HA case 307 DfsClientConf config = new DfsClientConf(conf); 308 T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, 309 RetryPolicies.failoverOnNetworkException( 310 RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(), 311 config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(), 312 config.getFailoverSleepMaxMillis())); 313 314 Text dtService; 315 if (failoverProxyProvider.useLogicalURI()) { 316 dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri, 317 HdfsConstants.HDFS_URI_SCHEME); 318 } else { 319 dtService = SecurityUtil.buildTokenService( 320 DFSUtilClient.getNNAddress(nameNodeUri)); 321 } 322 return new ProxyAndInfo<>(proxy, dtService, 323 DFSUtilClient.getNNAddressCheckLogical(conf, nameNodeUri)); 324 } 325 326 public static ClientProtocol createNonHAProxyWithClientProtocol( 327 InetSocketAddress address, Configuration conf, UserGroupInformation ugi, 328 boolean withRetries, AtomicBoolean fallbackToSimpleAuth) 329 throws IOException { 330 RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, 331 ProtobufRpcEngine.class); 332 333 final RetryPolicy defaultPolicy = 334 RetryUtils.getDefaultRetryPolicy( 335 conf, 336 HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, 337 HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, 338 HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY, 339 HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT, 340 SafeModeException.class.getName()); 341 342 final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); 343 ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( 344 ClientNamenodeProtocolPB.class, version, address, ugi, conf, 345 NetUtils.getDefaultSocketFactory(conf), 346 org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy, 347 fallbackToSimpleAuth).getProxy(); 348 349 if (withRetries) { // create the proxy with retries 350 Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>(); 351 ClientProtocol translatorProxy = 352 new ClientNamenodeProtocolTranslatorPB(proxy); 353 return (ClientProtocol) RetryProxy.create( 354 ClientProtocol.class, 355 new DefaultFailoverProxyProvider<>(ClientProtocol.class, 356 translatorProxy), 357 methodNameToPolicyMap, 358 defaultPolicy); 359 } else { 360 return new ClientNamenodeProtocolTranslatorPB(proxy); 361 } 362 } 363 364}