001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.yarn.client;
020    
021    import java.io.IOException;
022    import java.net.ConnectException;
023    import java.net.InetSocketAddress;
024    import java.security.PrivilegedAction;
025    import java.util.HashMap;
026    import java.util.Map;
027    import java.util.concurrent.TimeUnit;
028    
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    import org.apache.hadoop.classification.InterfaceAudience;
032    import org.apache.hadoop.classification.InterfaceAudience.Private;
033    import org.apache.hadoop.classification.InterfaceStability;
034    import org.apache.hadoop.conf.Configuration;
035    import org.apache.hadoop.io.retry.RetryPolicies;
036    import org.apache.hadoop.io.retry.RetryPolicy;
037    import org.apache.hadoop.io.retry.RetryProxy;
038    import org.apache.hadoop.security.UserGroupInformation;
039    import org.apache.hadoop.util.ReflectionUtils;
040    import org.apache.hadoop.yarn.conf.HAUtil;
041    import org.apache.hadoop.yarn.conf.YarnConfiguration;
042    import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
043    import org.apache.hadoop.yarn.ipc.YarnRPC;
044    
045    import com.google.common.annotations.VisibleForTesting;
046    
047    @InterfaceAudience.Public
048    @InterfaceStability.Evolving
049    @SuppressWarnings("unchecked")
050    public class RMProxy<T> {
051    
052      private static final Log LOG = LogFactory.getLog(RMProxy.class);
053    
054      protected RMProxy() {}
055    
056      /**
057       * Verify the passed protocol is supported.
058       */
059      @Private
060      protected void checkAllowedProtocols(Class<?> protocol) {}
061    
062      /**
063       * Get the ResourceManager address from the provided Configuration for the
064       * given protocol.
065       */
066      @Private
067      protected InetSocketAddress getRMAddress(
068          YarnConfiguration conf, Class<?> protocol) throws IOException {
069        throw new UnsupportedOperationException("This method should be invoked " +
070            "from an instance of ClientRMProxy or ServerRMProxy");
071      }
072    
073      /**
074       * Create a proxy for the specified protocol. For non-HA,
075       * this is a direct connection to the ResourceManager address. When HA is
076       * enabled, the proxy handles the failover between the ResourceManagers as
077       * well.
078       */
079      @Private
080      protected static <T> T createRMProxy(final Configuration configuration,
081          final Class<T> protocol, RMProxy instance) throws IOException {
082        YarnConfiguration conf = (configuration instanceof YarnConfiguration)
083            ? (YarnConfiguration) configuration
084            : new YarnConfiguration(configuration);
085        RetryPolicy retryPolicy = createRetryPolicy(conf);
086        if (HAUtil.isHAEnabled(conf)) {
087          RMFailoverProxyProvider<T> provider =
088              instance.createRMFailoverProxyProvider(conf, protocol);
089          return (T) RetryProxy.create(protocol, provider, retryPolicy);
090        } else {
091          InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
092          LOG.info("Connecting to ResourceManager at " + rmAddress);
093          T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
094          return (T) RetryProxy.create(protocol, proxy, retryPolicy);
095        }
096      }
097    
098      /**
099       * @deprecated
100       * This method is deprecated and is not used by YARN internally any more.
101       * To create a proxy to the RM, use ClientRMProxy#createRMProxy or
102       * ServerRMProxy#createRMProxy.
103       *
104       * Create a proxy to the ResourceManager at the specified address.
105       *
106       * @param conf Configuration to generate retry policy
107       * @param protocol Protocol for the proxy
108       * @param rmAddress Address of the ResourceManager
109       * @param <T> Type information of the proxy
110       * @return Proxy to the RM
111       * @throws IOException
112       */
113      @Deprecated
114      public static <T> T createRMProxy(final Configuration conf,
115          final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
116        RetryPolicy retryPolicy = createRetryPolicy(conf);
117        T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
118        LOG.info("Connecting to ResourceManager at " + rmAddress);
119        return (T) RetryProxy.create(protocol, proxy, retryPolicy);
120      }
121    
122      /**
123       * Get a proxy to the RM at the specified address. To be used to create a
124       * RetryProxy.
125       */
126      @Private
127      static <T> T getProxy(final Configuration conf,
128          final Class<T> protocol, final InetSocketAddress rmAddress)
129          throws IOException {
130        return UserGroupInformation.getCurrentUser().doAs(
131          new PrivilegedAction<T>() {
132            @Override
133            public T run() {
134              return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
135            }
136          });
137      }
138    
139      /**
140       * Helper method to create FailoverProxyProvider.
141       */
142      private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
143          Configuration conf, Class<T> protocol) {
144        Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
145        try {
146          defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
147              Class.forName(
148                  YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
149        } catch (Exception e) {
150          throw new YarnRuntimeException("Invalid default failover provider class" +
151              YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e);
152        }
153    
154        RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
155            conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
156                defaultProviderClass, RMFailoverProxyProvider.class), conf);
157        provider.init(conf, (RMProxy<T>) this, protocol);
158        return provider;
159      }
160    
161      /**
162       * Fetch retry policy from Configuration
163       */
164      @Private
165      @VisibleForTesting
166      public static RetryPolicy createRetryPolicy(Configuration conf) {
167        long rmConnectWaitMS =
168            conf.getInt(
169                YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
170                YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
171        long rmConnectionRetryIntervalMS =
172            conf.getLong(
173                YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
174                YarnConfiguration
175                    .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
176    
177        boolean waitForEver = (rmConnectWaitMS == -1);
178        if (!waitForEver) {
179          if (rmConnectWaitMS < 0) {
180            throw new YarnRuntimeException("Invalid Configuration. "
181                + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
182                + " can be -1, but can not be other negative numbers");
183          }
184    
185          // try connect once
186          if (rmConnectWaitMS < rmConnectionRetryIntervalMS) {
187            LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
188                + " is smaller than "
189                + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS
190                + ". Only try connect once.");
191            rmConnectWaitMS = 0;
192          }
193        }
194    
195        // Handle HA case first
196        if (HAUtil.isHAEnabled(conf)) {
197          final long failoverSleepBaseMs = conf.getLong(
198              YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
199              rmConnectionRetryIntervalMS);
200    
201          final long failoverSleepMaxMs = conf.getLong(
202              YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS,
203              rmConnectionRetryIntervalMS);
204    
205          int maxFailoverAttempts = conf.getInt(
206              YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
207    
208          if (maxFailoverAttempts == -1) {
209            if (waitForEver) {
210              maxFailoverAttempts = Integer.MAX_VALUE;
211            } else {
212              maxFailoverAttempts = (int) (rmConnectWaitMS / failoverSleepBaseMs);
213            }
214          }
215    
216          return RetryPolicies.failoverOnNetworkException(
217              RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
218              failoverSleepBaseMs, failoverSleepMaxMs);
219        }
220    
221        if (waitForEver) {
222          return RetryPolicies.RETRY_FOREVER;
223        }
224    
225        if (rmConnectionRetryIntervalMS < 0) {
226          throw new YarnRuntimeException("Invalid Configuration. " +
227              YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
228              " should not be negative.");
229        }
230    
231        RetryPolicy retryPolicy =
232            RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
233                rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
234    
235        Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
236            new HashMap<Class<? extends Exception>, RetryPolicy>();
237        exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
238        //TO DO: after HADOOP-9576,  IOException can be changed to EOFException
239        exceptionToPolicyMap.put(IOException.class, retryPolicy);
240        return RetryPolicies.retryByException(
241            RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
242      }
243    }