001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.client;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.net.ConnectException;
024import java.net.InetSocketAddress;
025import java.net.NoRouteToHostException;
026import java.net.SocketException;
027import java.net.UnknownHostException;
028import java.security.PrivilegedAction;
029import java.util.HashMap;
030import java.util.Map;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.apache.hadoop.classification.InterfaceAudience;
036import org.apache.hadoop.classification.InterfaceAudience.Private;
037import org.apache.hadoop.classification.InterfaceStability;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.io.retry.RetryPolicies;
040import org.apache.hadoop.io.retry.RetryPolicy;
041import org.apache.hadoop.io.retry.RetryProxy;
042import org.apache.hadoop.ipc.RetriableException;
043import org.apache.hadoop.net.ConnectTimeoutException;
044import org.apache.hadoop.security.UserGroupInformation;
045import org.apache.hadoop.util.ReflectionUtils;
046import org.apache.hadoop.yarn.conf.HAUtil;
047import org.apache.hadoop.yarn.conf.YarnConfiguration;
048import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
049import org.apache.hadoop.yarn.ipc.YarnRPC;
050
051import com.google.common.annotations.VisibleForTesting;
052
053@InterfaceAudience.Public
054@InterfaceStability.Evolving
055@SuppressWarnings("unchecked")
056public class RMProxy<T> {
057
058  private static final Log LOG = LogFactory.getLog(RMProxy.class);
059
060  protected RMProxy() {}
061
062  /**
063   * Verify the passed protocol is supported.
064   */
065  @Private
066  protected void checkAllowedProtocols(Class<?> protocol) {}
067
068  /**
069   * Get the ResourceManager address from the provided Configuration for the
070   * given protocol.
071   */
072  @Private
073  protected InetSocketAddress getRMAddress(
074      YarnConfiguration conf, Class<?> protocol) throws IOException {
075    throw new UnsupportedOperationException("This method should be invoked " +
076        "from an instance of ClientRMProxy or ServerRMProxy");
077  }
078
079  /**
080   * Create a proxy for the specified protocol. For non-HA,
081   * this is a direct connection to the ResourceManager address. When HA is
082   * enabled, the proxy handles the failover between the ResourceManagers as
083   * well.
084   */
085  @Private
086  protected static <T> T createRMProxy(final Configuration configuration,
087      final Class<T> protocol, RMProxy instance) throws IOException {
088    YarnConfiguration conf = (configuration instanceof YarnConfiguration)
089        ? (YarnConfiguration) configuration
090        : new YarnConfiguration(configuration);
091    RetryPolicy retryPolicy =
092        createRetryPolicy(conf);
093    return createRMProxy(conf, protocol, instance, retryPolicy);
094  }
095
096  /**
097   * Create a proxy for the specified protocol. For non-HA,
098   * this is a direct connection to the ResourceManager address. When HA is
099   * enabled, the proxy handles the failover between the ResourceManagers as
100   * well.
101   */
102  @Private
103  protected static <T> T createRMProxy(final Configuration configuration,
104      final Class<T> protocol, RMProxy instance, final long retryTime,
105      final long retryInterval) throws IOException {
106    YarnConfiguration conf = (configuration instanceof YarnConfiguration)
107        ? (YarnConfiguration) configuration
108        : new YarnConfiguration(configuration);
109    RetryPolicy retryPolicy =
110        createRetryPolicy(conf, retryTime, retryInterval);
111    return createRMProxy(conf, protocol, instance, retryPolicy);
112  }
113
114  private static <T> T createRMProxy(final YarnConfiguration conf,
115      final Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy)
116          throws IOException{
117    if (HAUtil.isHAEnabled(conf)) {
118      RMFailoverProxyProvider<T> provider =
119          instance.createRMFailoverProxyProvider(conf, protocol);
120      return (T) RetryProxy.create(protocol, provider, retryPolicy);
121    } else {
122      InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
123      LOG.info("Connecting to ResourceManager at " + rmAddress);
124      T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
125      return (T) RetryProxy.create(protocol, proxy, retryPolicy);
126    }
127  }
128
129  /**
130   * @deprecated
131   * This method is deprecated and is not used by YARN internally any more.
132   * To create a proxy to the RM, use ClientRMProxy#createRMProxy or
133   * ServerRMProxy#createRMProxy.
134   *
135   * Create a proxy to the ResourceManager at the specified address.
136   *
137   * @param conf Configuration to generate retry policy
138   * @param protocol Protocol for the proxy
139   * @param rmAddress Address of the ResourceManager
140   * @param <T> Type information of the proxy
141   * @return Proxy to the RM
142   * @throws IOException
143   */
144  @Deprecated
145  public static <T> T createRMProxy(final Configuration conf,
146      final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
147    RetryPolicy retryPolicy = createRetryPolicy(conf);
148    T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
149    LOG.info("Connecting to ResourceManager at " + rmAddress);
150    return (T) RetryProxy.create(protocol, proxy, retryPolicy);
151  }
152
153  /**
154   * Get a proxy to the RM at the specified address. To be used to create a
155   * RetryProxy.
156   */
157  @Private
158  static <T> T getProxy(final Configuration conf,
159      final Class<T> protocol, final InetSocketAddress rmAddress)
160      throws IOException {
161    return UserGroupInformation.getCurrentUser().doAs(
162      new PrivilegedAction<T>() {
163        @Override
164        public T run() {
165          return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
166        }
167      });
168  }
169
170  /**
171   * Helper method to create FailoverProxyProvider.
172   */
173  private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
174      Configuration conf, Class<T> protocol) {
175    Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
176    try {
177      defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
178          Class.forName(
179              YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
180    } catch (Exception e) {
181      throw new YarnRuntimeException("Invalid default failover provider class" +
182          YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e);
183    }
184
185    RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
186        conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
187            defaultProviderClass, RMFailoverProxyProvider.class), conf);
188    provider.init(conf, (RMProxy<T>) this, protocol);
189    return provider;
190  }
191
192  /**
193   * Fetch retry policy from Configuration
194   */
195  @Private
196  @VisibleForTesting
197  public static RetryPolicy createRetryPolicy(Configuration conf) {
198    long rmConnectWaitMS =
199        conf.getLong(
200            YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
201            YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
202    long rmConnectionRetryIntervalMS =
203        conf.getLong(
204            YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
205            YarnConfiguration
206                .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
207    return createRetryPolicy(
208        conf, rmConnectWaitMS, rmConnectionRetryIntervalMS);
209  }
210
211  /**
212   * Fetch retry policy from Configuration and create the
213   * retry policy with specified retryTime and retry interval.
214   */
215  private static RetryPolicy createRetryPolicy(Configuration conf,
216      long retryTime, long retryInterval) {
217    long rmConnectWaitMS = retryTime;
218    long rmConnectionRetryIntervalMS = retryInterval;
219
220    boolean waitForEver = (rmConnectWaitMS == -1);
221    if (!waitForEver) {
222      if (rmConnectWaitMS < 0) {
223        throw new YarnRuntimeException("Invalid Configuration. "
224            + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
225            + " can be -1, but can not be other negative numbers");
226      }
227
228      // try connect once
229      if (rmConnectWaitMS < rmConnectionRetryIntervalMS) {
230        LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
231            + " is smaller than "
232            + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS
233            + ". Only try connect once.");
234        rmConnectWaitMS = 0;
235      }
236    }
237
238    // Handle HA case first
239    if (HAUtil.isHAEnabled(conf)) {
240      final long failoverSleepBaseMs = conf.getLong(
241          YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
242          rmConnectionRetryIntervalMS);
243
244      final long failoverSleepMaxMs = conf.getLong(
245          YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS,
246          rmConnectionRetryIntervalMS);
247
248      int maxFailoverAttempts = conf.getInt(
249          YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
250
251      if (maxFailoverAttempts == -1) {
252        if (waitForEver) {
253          maxFailoverAttempts = Integer.MAX_VALUE;
254        } else {
255          maxFailoverAttempts = (int) (rmConnectWaitMS / failoverSleepBaseMs);
256        }
257      }
258
259      return RetryPolicies.failoverOnNetworkException(
260          RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
261          failoverSleepBaseMs, failoverSleepMaxMs);
262    }
263
264    if (rmConnectionRetryIntervalMS < 0) {
265      throw new YarnRuntimeException("Invalid Configuration. " +
266          YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
267          " should not be negative.");
268    }
269
270    RetryPolicy retryPolicy = null;
271    if (waitForEver) {
272      retryPolicy = RetryPolicies.retryForeverWithFixedSleep(
273          rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
274    } else {
275      retryPolicy =
276          RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
277              rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
278    }
279
280    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
281        new HashMap<Class<? extends Exception>, RetryPolicy>();
282
283    exceptionToPolicyMap.put(EOFException.class, retryPolicy);
284    exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
285    exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
286    exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
287    exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
288    exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
289    exceptionToPolicyMap.put(SocketException.class, retryPolicy);
290    // YARN-4288: local IOException is also possible.
291    exceptionToPolicyMap.put(IOException.class, retryPolicy);
292    // Not retry on remote IO exception.
293    return RetryPolicies.retryOtherThanRemoteException(
294        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
295  }
296}