001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.client;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.net.ConnectException;
024import java.net.InetSocketAddress;
025import java.net.NoRouteToHostException;
026import java.net.SocketException;
027import java.net.UnknownHostException;
028import java.security.PrivilegedAction;
029import java.util.HashMap;
030import java.util.Map;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.apache.hadoop.classification.InterfaceAudience;
036import org.apache.hadoop.classification.InterfaceAudience.Private;
037import org.apache.hadoop.classification.InterfaceStability;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.io.retry.RetryPolicies;
040import org.apache.hadoop.io.retry.RetryPolicy;
041import org.apache.hadoop.io.retry.RetryProxy;
042import org.apache.hadoop.ipc.RetriableException;
043import org.apache.hadoop.net.ConnectTimeoutException;
044import org.apache.hadoop.security.UserGroupInformation;
045import org.apache.hadoop.util.ReflectionUtils;
046import org.apache.hadoop.yarn.conf.HAUtil;
047import org.apache.hadoop.yarn.conf.YarnConfiguration;
048import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
049import org.apache.hadoop.yarn.ipc.YarnRPC;
050
051import com.google.common.annotations.VisibleForTesting;
052
053@InterfaceAudience.Public
054@InterfaceStability.Evolving
055@SuppressWarnings("unchecked")
056public class RMProxy<T> {
057
058  private static final Log LOG = LogFactory.getLog(RMProxy.class);
059
060  protected RMProxy() {}
061
062  /**
063   * Verify the passed protocol is supported.
064   */
065  @Private
066  protected void checkAllowedProtocols(Class<?> protocol) {}
067
068  /**
069   * Get the ResourceManager address from the provided Configuration for the
070   * given protocol.
071   */
072  @Private
073  protected InetSocketAddress getRMAddress(
074      YarnConfiguration conf, Class<?> protocol) throws IOException {
075    throw new UnsupportedOperationException("This method should be invoked " +
076        "from an instance of ClientRMProxy or ServerRMProxy");
077  }
078
079  /**
080   * Create a proxy for the specified protocol. For non-HA,
081   * this is a direct connection to the ResourceManager address. When HA is
082   * enabled, the proxy handles the failover between the ResourceManagers as
083   * well.
084   */
085  @Private
086  protected static <T> T createRMProxy(final Configuration configuration,
087      final Class<T> protocol, RMProxy instance) throws IOException {
088    YarnConfiguration conf = (configuration instanceof YarnConfiguration)
089        ? (YarnConfiguration) configuration
090        : new YarnConfiguration(configuration);
091    RetryPolicy retryPolicy = createRetryPolicy(conf);
092    if (HAUtil.isHAEnabled(conf)) {
093      RMFailoverProxyProvider<T> provider =
094          instance.createRMFailoverProxyProvider(conf, protocol);
095      return (T) RetryProxy.create(protocol, provider, retryPolicy);
096    } else {
097      InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
098      LOG.info("Connecting to ResourceManager at " + rmAddress);
099      T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
100      return (T) RetryProxy.create(protocol, proxy, retryPolicy);
101    }
102  }
103
104  /**
105   * @deprecated
106   * This method is deprecated and is not used by YARN internally any more.
107   * To create a proxy to the RM, use ClientRMProxy#createRMProxy or
108   * ServerRMProxy#createRMProxy.
109   *
110   * Create a proxy to the ResourceManager at the specified address.
111   *
112   * @param conf Configuration to generate retry policy
113   * @param protocol Protocol for the proxy
114   * @param rmAddress Address of the ResourceManager
115   * @param <T> Type information of the proxy
116   * @return Proxy to the RM
117   * @throws IOException
118   */
119  @Deprecated
120  public static <T> T createRMProxy(final Configuration conf,
121      final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
122    RetryPolicy retryPolicy = createRetryPolicy(conf);
123    T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
124    LOG.info("Connecting to ResourceManager at " + rmAddress);
125    return (T) RetryProxy.create(protocol, proxy, retryPolicy);
126  }
127
128  /**
129   * Get a proxy to the RM at the specified address. To be used to create a
130   * RetryProxy.
131   */
132  @Private
133  static <T> T getProxy(final Configuration conf,
134      final Class<T> protocol, final InetSocketAddress rmAddress)
135      throws IOException {
136    return UserGroupInformation.getCurrentUser().doAs(
137      new PrivilegedAction<T>() {
138        @Override
139        public T run() {
140          return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
141        }
142      });
143  }
144
145  /**
146   * Helper method to create FailoverProxyProvider.
147   */
148  private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
149      Configuration conf, Class<T> protocol) {
150    Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
151    try {
152      defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
153          Class.forName(
154              YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
155    } catch (Exception e) {
156      throw new YarnRuntimeException("Invalid default failover provider class" +
157          YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e);
158    }
159
160    RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
161        conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
162            defaultProviderClass, RMFailoverProxyProvider.class), conf);
163    provider.init(conf, (RMProxy<T>) this, protocol);
164    return provider;
165  }
166
167  /**
168   * Fetch retry policy from Configuration
169   */
170  @Private
171  @VisibleForTesting
172  public static RetryPolicy createRetryPolicy(Configuration conf) {
173    long rmConnectWaitMS =
174        conf.getLong(
175            YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
176            YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
177    long rmConnectionRetryIntervalMS =
178        conf.getLong(
179            YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
180            YarnConfiguration
181                .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
182
183    boolean waitForEver = (rmConnectWaitMS == -1);
184    if (!waitForEver) {
185      if (rmConnectWaitMS < 0) {
186        throw new YarnRuntimeException("Invalid Configuration. "
187            + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
188            + " can be -1, but can not be other negative numbers");
189      }
190
191      // try connect once
192      if (rmConnectWaitMS < rmConnectionRetryIntervalMS) {
193        LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
194            + " is smaller than "
195            + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS
196            + ". Only try connect once.");
197        rmConnectWaitMS = 0;
198      }
199    }
200
201    // Handle HA case first
202    if (HAUtil.isHAEnabled(conf)) {
203      final long failoverSleepBaseMs = conf.getLong(
204          YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
205          rmConnectionRetryIntervalMS);
206
207      final long failoverSleepMaxMs = conf.getLong(
208          YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS,
209          rmConnectionRetryIntervalMS);
210
211      int maxFailoverAttempts = conf.getInt(
212          YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
213
214      if (maxFailoverAttempts == -1) {
215        if (waitForEver) {
216          maxFailoverAttempts = Integer.MAX_VALUE;
217        } else {
218          maxFailoverAttempts = (int) (rmConnectWaitMS / failoverSleepBaseMs);
219        }
220      }
221
222      return RetryPolicies.failoverOnNetworkException(
223          RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
224          failoverSleepBaseMs, failoverSleepMaxMs);
225    }
226
227    if (waitForEver) {
228      return RetryPolicies.RETRY_FOREVER;
229    }
230
231    if (rmConnectionRetryIntervalMS < 0) {
232      throw new YarnRuntimeException("Invalid Configuration. " +
233          YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
234          " should not be negative.");
235    }
236
237    RetryPolicy retryPolicy =
238        RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
239            rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
240
241    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
242        new HashMap<Class<? extends Exception>, RetryPolicy>();
243
244    exceptionToPolicyMap.put(EOFException.class, retryPolicy);
245    exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
246    exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
247    exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
248    exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
249    exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
250    exceptionToPolicyMap.put(SocketException.class, retryPolicy);
251
252    return RetryPolicies.retryByException(
253        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
254  }
255}