001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.client;
020
021 import java.io.EOFException;
022 import java.io.IOException;
023 import java.net.ConnectException;
024 import java.net.InetSocketAddress;
025 import java.net.NoRouteToHostException;
026 import java.net.SocketException;
027 import java.net.UnknownHostException;
028 import java.security.PrivilegedAction;
029 import java.util.HashMap;
030 import java.util.Map;
031 import java.util.concurrent.TimeUnit;
032
033 import org.apache.commons.logging.Log;
034 import org.apache.commons.logging.LogFactory;
035 import org.apache.hadoop.classification.InterfaceAudience;
036 import org.apache.hadoop.classification.InterfaceAudience.Private;
037 import org.apache.hadoop.classification.InterfaceStability;
038 import org.apache.hadoop.conf.Configuration;
039 import org.apache.hadoop.io.retry.RetryPolicies;
040 import org.apache.hadoop.io.retry.RetryPolicy;
041 import org.apache.hadoop.io.retry.RetryProxy;
042 import org.apache.hadoop.ipc.RetriableException;
043 import org.apache.hadoop.net.ConnectTimeoutException;
044 import org.apache.hadoop.security.UserGroupInformation;
045 import org.apache.hadoop.util.ReflectionUtils;
046 import org.apache.hadoop.yarn.conf.HAUtil;
047 import org.apache.hadoop.yarn.conf.YarnConfiguration;
048 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
049 import org.apache.hadoop.yarn.ipc.YarnRPC;
050
051 import com.google.common.annotations.VisibleForTesting;
052
053 @InterfaceAudience.Public
054 @InterfaceStability.Evolving
055 @SuppressWarnings("unchecked")
056 public class RMProxy<T> {
057
058 private static final Log LOG = LogFactory.getLog(RMProxy.class);
059
060 protected RMProxy() {}
061
062 /**
063 * Verify the passed protocol is supported.
064 */
065 @Private
066 protected void checkAllowedProtocols(Class<?> protocol) {}
067
068 /**
069 * Get the ResourceManager address from the provided Configuration for the
070 * given protocol.
071 */
072 @Private
073 protected InetSocketAddress getRMAddress(
074 YarnConfiguration conf, Class<?> protocol) throws IOException {
075 throw new UnsupportedOperationException("This method should be invoked " +
076 "from an instance of ClientRMProxy or ServerRMProxy");
077 }
078
079 /**
080 * Create a proxy for the specified protocol. For non-HA,
081 * this is a direct connection to the ResourceManager address. When HA is
082 * enabled, the proxy handles the failover between the ResourceManagers as
083 * well.
084 */
085 @Private
086 protected static <T> T createRMProxy(final Configuration configuration,
087 final Class<T> protocol, RMProxy instance) throws IOException {
088 YarnConfiguration conf = (configuration instanceof YarnConfiguration)
089 ? (YarnConfiguration) configuration
090 : new YarnConfiguration(configuration);
091 RetryPolicy retryPolicy = createRetryPolicy(conf);
092 if (HAUtil.isHAEnabled(conf)) {
093 RMFailoverProxyProvider<T> provider =
094 instance.createRMFailoverProxyProvider(conf, protocol);
095 return (T) RetryProxy.create(protocol, provider, retryPolicy);
096 } else {
097 InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
098 LOG.info("Connecting to ResourceManager at " + rmAddress);
099 T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
100 return (T) RetryProxy.create(protocol, proxy, retryPolicy);
101 }
102 }
103
104 /**
105 * @deprecated
106 * This method is deprecated and is not used by YARN internally any more.
107 * To create a proxy to the RM, use ClientRMProxy#createRMProxy or
108 * ServerRMProxy#createRMProxy.
109 *
110 * Create a proxy to the ResourceManager at the specified address.
111 *
112 * @param conf Configuration to generate retry policy
113 * @param protocol Protocol for the proxy
114 * @param rmAddress Address of the ResourceManager
115 * @param <T> Type information of the proxy
116 * @return Proxy to the RM
117 * @throws IOException
118 */
119 @Deprecated
120 public static <T> T createRMProxy(final Configuration conf,
121 final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
122 RetryPolicy retryPolicy = createRetryPolicy(conf);
123 T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
124 LOG.info("Connecting to ResourceManager at " + rmAddress);
125 return (T) RetryProxy.create(protocol, proxy, retryPolicy);
126 }
127
128 /**
129 * Get a proxy to the RM at the specified address. To be used to create a
130 * RetryProxy.
131 */
132 @Private
133 static <T> T getProxy(final Configuration conf,
134 final Class<T> protocol, final InetSocketAddress rmAddress)
135 throws IOException {
136 return UserGroupInformation.getCurrentUser().doAs(
137 new PrivilegedAction<T>() {
138 @Override
139 public T run() {
140 return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
141 }
142 });
143 }
144
145 /**
146 * Helper method to create FailoverProxyProvider.
147 */
148 private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
149 Configuration conf, Class<T> protocol) {
150 Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
151 try {
152 defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
153 Class.forName(
154 YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
155 } catch (Exception e) {
156 throw new YarnRuntimeException("Invalid default failover provider class" +
157 YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e);
158 }
159
160 RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
161 conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
162 defaultProviderClass, RMFailoverProxyProvider.class), conf);
163 provider.init(conf, (RMProxy<T>) this, protocol);
164 return provider;
165 }
166
167 /**
168 * Fetch retry policy from Configuration
169 */
170 @Private
171 @VisibleForTesting
172 public static RetryPolicy createRetryPolicy(Configuration conf) {
173 long rmConnectWaitMS =
174 conf.getLong(
175 YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
176 YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
177 long rmConnectionRetryIntervalMS =
178 conf.getLong(
179 YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
180 YarnConfiguration
181 .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
182
183 boolean waitForEver = (rmConnectWaitMS == -1);
184 if (!waitForEver) {
185 if (rmConnectWaitMS < 0) {
186 throw new YarnRuntimeException("Invalid Configuration. "
187 + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
188 + " can be -1, but can not be other negative numbers");
189 }
190
191 // try connect once
192 if (rmConnectWaitMS < rmConnectionRetryIntervalMS) {
193 LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
194 + " is smaller than "
195 + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS
196 + ". Only try connect once.");
197 rmConnectWaitMS = 0;
198 }
199 }
200
201 // Handle HA case first
202 if (HAUtil.isHAEnabled(conf)) {
203 final long failoverSleepBaseMs = conf.getLong(
204 YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
205 rmConnectionRetryIntervalMS);
206
207 final long failoverSleepMaxMs = conf.getLong(
208 YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS,
209 rmConnectionRetryIntervalMS);
210
211 int maxFailoverAttempts = conf.getInt(
212 YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
213
214 if (maxFailoverAttempts == -1) {
215 if (waitForEver) {
216 maxFailoverAttempts = Integer.MAX_VALUE;
217 } else {
218 maxFailoverAttempts = (int) (rmConnectWaitMS / failoverSleepBaseMs);
219 }
220 }
221
222 return RetryPolicies.failoverOnNetworkException(
223 RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
224 failoverSleepBaseMs, failoverSleepMaxMs);
225 }
226
227 if (waitForEver) {
228 return RetryPolicies.RETRY_FOREVER;
229 }
230
231 if (rmConnectionRetryIntervalMS < 0) {
232 throw new YarnRuntimeException("Invalid Configuration. " +
233 YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
234 " should not be negative.");
235 }
236
237 RetryPolicy retryPolicy =
238 RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
239 rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
240
241 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
242 new HashMap<Class<? extends Exception>, RetryPolicy>();
243
244 exceptionToPolicyMap.put(EOFException.class, retryPolicy);
245 exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
246 exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
247 exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
248 exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
249 exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
250 exceptionToPolicyMap.put(SocketException.class, retryPolicy);
251
252 return RetryPolicies.retryByException(
253 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
254 }
255 }