001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs;
019
020import java.io.IOException;
021import java.lang.reflect.Constructor;
022import java.lang.reflect.InvocationHandler;
023import java.lang.reflect.Proxy;
024import java.net.InetSocketAddress;
025import java.net.URI;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.concurrent.atomic.AtomicBoolean;
029
030import org.apache.hadoop.classification.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import com.google.common.annotations.VisibleForTesting;
035import com.google.common.base.Preconditions;
036
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
039import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
040import org.apache.hadoop.hdfs.protocol.ClientProtocol;
041import org.apache.hadoop.hdfs.protocol.HdfsConstants;
042import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
043import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
044import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
045import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
046import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider;
047import org.apache.hadoop.io.Text;
048import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
049import org.apache.hadoop.io.retry.FailoverProxyProvider;
050import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
051import org.apache.hadoop.io.retry.RetryPolicies;
052import org.apache.hadoop.io.retry.RetryPolicy;
053import org.apache.hadoop.io.retry.RetryProxy;
054import org.apache.hadoop.io.retry.RetryUtils;
055import org.apache.hadoop.ipc.ProtobufRpcEngine;
056import org.apache.hadoop.ipc.RPC;
057import org.apache.hadoop.net.NetUtils;
058import org.apache.hadoop.security.SecurityUtil;
059import org.apache.hadoop.security.UserGroupInformation;
060
061/**
062 * Create proxy objects with {@link ClientProtocol} to communicate with a remote
063 * NN. Generally use {@link NameNodeProxiesClient#createProxyWithClientProtocol(
064 * Configuration, URI, AtomicBoolean)}, which will create either an HA- or
065 * non-HA-enabled client proxy as appropriate.
066 *
067 * For creating proxy objects with other protocols, please see
068 * {@link NameNodeProxies#createProxy(Configuration, URI, Class)}.
069 */
070@InterfaceAudience.Private
071public class NameNodeProxiesClient {
072
073  private static final Logger LOG = LoggerFactory.getLogger(
074      NameNodeProxiesClient.class);
075
076  /**
077   * Wrapper for a client proxy as well as its associated service ID.
078   * This is simply used as a tuple-like return type for created NN proxy.
079   */
080  public static class ProxyAndInfo<PROXYTYPE> {
081    private final PROXYTYPE proxy;
082    private final Text dtService;
083    private final InetSocketAddress address;
084
085    public ProxyAndInfo(PROXYTYPE proxy, Text dtService,
086                        InetSocketAddress address) {
087      this.proxy = proxy;
088      this.dtService = dtService;
089      this.address = address;
090    }
091
092    public PROXYTYPE getProxy() {
093      return proxy;
094    }
095
096    public Text getDelegationTokenService() {
097      return dtService;
098    }
099
100    public InetSocketAddress getAddress() {
101      return address;
102    }
103  }
104
105  /**
106   * Creates the namenode proxy with the ClientProtocol. This will handle
107   * creation of either HA- or non-HA-enabled proxy objects, depending upon
108   * if the provided URI is a configured logical URI.
109   *
110   * @param conf the configuration containing the required IPC
111   *        properties, client failover configurations, etc.
112   * @param nameNodeUri the URI pointing either to a specific NameNode
113   *        or to a logical nameservice.
114   * @param fallbackToSimpleAuth set to true or false during calls to indicate
115   *        if a secure client falls back to simple auth
116   * @return an object containing both the proxy and the associated
117   *         delegation token service it corresponds to
118   * @throws IOException if there is an error creating the proxy
119   * @see {@link NameNodeProxies#createProxy(Configuration, URI, Class)}.
120   */
121  public static ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol(
122      Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth)
123      throws IOException {
124    AbstractNNFailoverProxyProvider<ClientProtocol> failoverProxyProvider =
125        createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class,
126            true, fallbackToSimpleAuth);
127
128    if (failoverProxyProvider == null) {
129      InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
130      Text dtService = SecurityUtil.buildTokenService(nnAddr);
131      ClientProtocol proxy = createNonHAProxyWithClientProtocol(nnAddr, conf,
132          UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
133      return new ProxyAndInfo<>(proxy, dtService, nnAddr);
134    } else {
135      return createHAProxy(conf, nameNodeUri, ClientProtocol.class,
136          failoverProxyProvider);
137    }
138  }
139
140  /**
141   * Generate a dummy namenode proxy instance that utilizes our hacked
142   * {@link LossyRetryInvocationHandler}. Proxy instance generated using this
143   * method will proactively drop RPC responses. Currently this method only
144   * support HA setup. null will be returned if the given configuration is not
145   * for HA.
146   *
147   * @param config the configuration containing the required IPC
148   *        properties, client failover configurations, etc.
149   * @param nameNodeUri the URI pointing either to a specific NameNode
150   *        or to a logical nameservice.
151   * @param xface the IPC interface which should be created
152   * @param numResponseToDrop The number of responses to drop for each RPC call
153   * @param fallbackToSimpleAuth set to true or false during calls to indicate
154   *        if a secure client falls back to simple auth
155   * @return an object containing both the proxy and the associated
156   *         delegation token service it corresponds to. Will return null of the
157   *         given configuration does not support HA.
158   * @throws IOException if there is an error creating the proxy
159   */
160  public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
161      Configuration config, URI nameNodeUri, Class<T> xface,
162      int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)
163      throws IOException {
164    Preconditions.checkArgument(numResponseToDrop > 0);
165    AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
166        createFailoverProxyProvider(config, nameNodeUri, xface, true,
167            fallbackToSimpleAuth);
168
169    if (failoverProxyProvider != null) { // HA case
170      int delay = config.getInt(
171          HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
172          HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
173      int maxCap = config.getInt(
174          HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY,
175          HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT);
176      int maxFailoverAttempts = config.getInt(
177          HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
178          HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
179      int maxRetryAttempts = config.getInt(
180          HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
181          HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
182      InvocationHandler dummyHandler = new LossyRetryInvocationHandler<>(
183              numResponseToDrop, failoverProxyProvider,
184              RetryPolicies.failoverOnNetworkException(
185                  RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
186                  Math.max(numResponseToDrop + 1, maxRetryAttempts), delay,
187                  maxCap));
188
189      @SuppressWarnings("unchecked")
190      T proxy = (T) Proxy.newProxyInstance(
191          failoverProxyProvider.getInterface().getClassLoader(),
192          new Class[]{xface}, dummyHandler);
193      Text dtService;
194      if (failoverProxyProvider.useLogicalURI()) {
195        dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
196            HdfsConstants.HDFS_URI_SCHEME);
197      } else {
198        dtService = SecurityUtil.buildTokenService(
199            DFSUtilClient.getNNAddress(nameNodeUri));
200      }
201      return new ProxyAndInfo<>(proxy, dtService,
202          DFSUtilClient.getNNAddress(nameNodeUri));
203    } else {
204      LOG.warn("Currently creating proxy using " +
205          "LossyRetryInvocationHandler requires NN HA setup");
206      return null;
207    }
208  }
209
210  /** Creates the Failover proxy provider instance*/
211  @VisibleForTesting
212  public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
213      Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
214      AtomicBoolean fallbackToSimpleAuth) throws IOException {
215    Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
216    AbstractNNFailoverProxyProvider<T> providerNN;
217    try {
218      // Obtain the class of the proxy provider
219      failoverProxyProviderClass = getFailoverProxyProviderClass(conf,
220          nameNodeUri);
221      if (failoverProxyProviderClass == null) {
222        return null;
223      }
224      // Create a proxy provider instance.
225      Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
226          .getConstructor(Configuration.class, URI.class, Class.class);
227      FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
228          xface);
229
230      // If the proxy provider is of an old implementation, wrap it.
231      if (!(provider instanceof AbstractNNFailoverProxyProvider)) {
232        providerNN = new WrappedFailoverProxyProvider<>(provider);
233      } else {
234        providerNN = (AbstractNNFailoverProxyProvider<T>)provider;
235      }
236    } catch (Exception e) {
237      final String message = "Couldn't create proxy provider " +
238          failoverProxyProviderClass;
239      LOG.debug(message, e);
240      if (e.getCause() instanceof IOException) {
241        throw (IOException) e.getCause();
242      } else {
243        throw new IOException(message, e);
244      }
245    }
246
247    // Check the port in the URI, if it is logical.
248    if (checkPort && providerNN.useLogicalURI()) {
249      int port = nameNodeUri.getPort();
250      if (port > 0 &&
251          port != HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT) {
252        // Throwing here without any cleanup is fine since we have not
253        // actually created the underlying proxies yet.
254        throw new IOException("Port " + port + " specified in URI "
255            + nameNodeUri + " but host '" + nameNodeUri.getHost()
256            + "' is a logical (HA) namenode"
257            + " and does not use port information.");
258      }
259    }
260    providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth);
261    return providerNN;
262  }
263
264  /** Gets the configured Failover proxy provider's class */
265  @VisibleForTesting
266  public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
267      Configuration conf, URI nameNodeUri) throws IOException {
268    if (nameNodeUri == null) {
269      return null;
270    }
271    String host = nameNodeUri.getHost();
272    String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
273        + "." + host;
274    try {
275      @SuppressWarnings("unchecked")
276      Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>)
277          conf.getClass(configKey, null, FailoverProxyProvider.class);
278      return ret;
279    } catch (RuntimeException e) {
280      if (e.getCause() instanceof ClassNotFoundException) {
281        throw new IOException("Could not load failover proxy provider class "
282            + conf.get(configKey) + " which is configured for authority "
283            + nameNodeUri, e);
284      } else {
285        throw e;
286      }
287    }
288  }
289
290  /**
291   * Creates an explicitly HA-enabled proxy object.
292   *
293   * @param conf the configuration object
294   * @param nameNodeUri the URI pointing either to a specific NameNode or to a
295   *        logical nameservice.
296   * @param xface the IPC interface which should be created
297   * @param failoverProxyProvider Failover proxy provider
298   * @return an object containing both the proxy and the associated
299   *         delegation token service it corresponds to
300   */
301  @SuppressWarnings("unchecked")
302  public static <T> ProxyAndInfo<T> createHAProxy(
303      Configuration conf, URI nameNodeUri, Class<T> xface,
304      AbstractNNFailoverProxyProvider<T> failoverProxyProvider) {
305    Preconditions.checkNotNull(failoverProxyProvider);
306    // HA case
307    DfsClientConf config = new DfsClientConf(conf);
308    T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
309        RetryPolicies.failoverOnNetworkException(
310            RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(),
311            config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(),
312            config.getFailoverSleepMaxMillis()));
313
314    Text dtService;
315    if (failoverProxyProvider.useLogicalURI()) {
316      dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
317          HdfsConstants.HDFS_URI_SCHEME);
318    } else {
319      dtService = SecurityUtil.buildTokenService(
320          DFSUtilClient.getNNAddress(nameNodeUri));
321    }
322    return new ProxyAndInfo<>(proxy, dtService,
323        DFSUtilClient.getNNAddressCheckLogical(conf, nameNodeUri));
324  }
325
326  public static ClientProtocol createNonHAProxyWithClientProtocol(
327      InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
328      boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
329      throws IOException {
330    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
331        ProtobufRpcEngine.class);
332
333    final RetryPolicy defaultPolicy =
334        RetryUtils.getDefaultRetryPolicy(
335            conf,
336            HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
337            HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
338            HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
339            HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
340            SafeModeException.class.getName());
341
342    final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
343    ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
344        ClientNamenodeProtocolPB.class, version, address, ugi, conf,
345        NetUtils.getDefaultSocketFactory(conf),
346        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
347        fallbackToSimpleAuth).getProxy();
348
349    if (withRetries) { // create the proxy with retries
350      Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>();
351      ClientProtocol translatorProxy =
352          new ClientNamenodeProtocolTranslatorPB(proxy);
353      return (ClientProtocol) RetryProxy.create(
354          ClientProtocol.class,
355          new DefaultFailoverProxyProvider<>(ClientProtocol.class,
356              translatorProxy),
357          methodNameToPolicyMap,
358          defaultPolicy);
359    } else {
360      return new ClientNamenodeProtocolTranslatorPB(proxy);
361    }
362  }
363
364}