001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.yarn.client;
020
021 import java.io.IOException;
022 import java.net.ConnectException;
023 import java.net.InetSocketAddress;
024 import java.security.PrivilegedAction;
025 import java.util.HashMap;
026 import java.util.Map;
027 import java.util.concurrent.TimeUnit;
028
029 import org.apache.commons.logging.Log;
030 import org.apache.commons.logging.LogFactory;
031 import org.apache.hadoop.classification.InterfaceAudience;
032 import org.apache.hadoop.classification.InterfaceAudience.Private;
033 import org.apache.hadoop.classification.InterfaceStability;
034 import org.apache.hadoop.conf.Configuration;
035 import org.apache.hadoop.io.retry.RetryPolicies;
036 import org.apache.hadoop.io.retry.RetryPolicy;
037 import org.apache.hadoop.io.retry.RetryProxy;
038 import org.apache.hadoop.security.UserGroupInformation;
039 import org.apache.hadoop.util.ReflectionUtils;
040 import org.apache.hadoop.yarn.conf.HAUtil;
041 import org.apache.hadoop.yarn.conf.YarnConfiguration;
042 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
043 import org.apache.hadoop.yarn.ipc.YarnRPC;
044
045 import com.google.common.annotations.VisibleForTesting;
046
047 @InterfaceAudience.Public
048 @InterfaceStability.Evolving
049 @SuppressWarnings("unchecked")
050 public class RMProxy<T> {
051
052 private static final Log LOG = LogFactory.getLog(RMProxy.class);
053
054 protected RMProxy() {}
055
056 /**
057 * Verify the passed protocol is supported.
058 */
059 @Private
060 protected void checkAllowedProtocols(Class<?> protocol) {}
061
062 /**
063 * Get the ResourceManager address from the provided Configuration for the
064 * given protocol.
065 */
066 @Private
067 protected InetSocketAddress getRMAddress(
068 YarnConfiguration conf, Class<?> protocol) throws IOException {
069 throw new UnsupportedOperationException("This method should be invoked " +
070 "from an instance of ClientRMProxy or ServerRMProxy");
071 }
072
073 /**
074 * Create a proxy for the specified protocol. For non-HA,
075 * this is a direct connection to the ResourceManager address. When HA is
076 * enabled, the proxy handles the failover between the ResourceManagers as
077 * well.
078 */
079 @Private
080 protected static <T> T createRMProxy(final Configuration configuration,
081 final Class<T> protocol, RMProxy instance) throws IOException {
082 YarnConfiguration conf = (configuration instanceof YarnConfiguration)
083 ? (YarnConfiguration) configuration
084 : new YarnConfiguration(configuration);
085 RetryPolicy retryPolicy = createRetryPolicy(conf);
086 if (HAUtil.isHAEnabled(conf)) {
087 RMFailoverProxyProvider<T> provider =
088 instance.createRMFailoverProxyProvider(conf, protocol);
089 return (T) RetryProxy.create(protocol, provider, retryPolicy);
090 } else {
091 InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
092 LOG.info("Connecting to ResourceManager at " + rmAddress);
093 T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
094 return (T) RetryProxy.create(protocol, proxy, retryPolicy);
095 }
096 }
097
098 /**
099 * @deprecated
100 * This method is deprecated and is not used by YARN internally any more.
101 * To create a proxy to the RM, use ClientRMProxy#createRMProxy or
102 * ServerRMProxy#createRMProxy.
103 *
104 * Create a proxy to the ResourceManager at the specified address.
105 *
106 * @param conf Configuration to generate retry policy
107 * @param protocol Protocol for the proxy
108 * @param rmAddress Address of the ResourceManager
109 * @param <T> Type information of the proxy
110 * @return Proxy to the RM
111 * @throws IOException
112 */
113 @Deprecated
114 public static <T> T createRMProxy(final Configuration conf,
115 final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
116 RetryPolicy retryPolicy = createRetryPolicy(conf);
117 T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
118 LOG.info("Connecting to ResourceManager at " + rmAddress);
119 return (T) RetryProxy.create(protocol, proxy, retryPolicy);
120 }
121
122 /**
123 * Get a proxy to the RM at the specified address. To be used to create a
124 * RetryProxy.
125 */
126 @Private
127 static <T> T getProxy(final Configuration conf,
128 final Class<T> protocol, final InetSocketAddress rmAddress)
129 throws IOException {
130 return UserGroupInformation.getCurrentUser().doAs(
131 new PrivilegedAction<T>() {
132 @Override
133 public T run() {
134 return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
135 }
136 });
137 }
138
139 /**
140 * Helper method to create FailoverProxyProvider.
141 */
142 private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
143 Configuration conf, Class<T> protocol) {
144 Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
145 try {
146 defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
147 Class.forName(
148 YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
149 } catch (Exception e) {
150 throw new YarnRuntimeException("Invalid default failover provider class" +
151 YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e);
152 }
153
154 RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
155 conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
156 defaultProviderClass, RMFailoverProxyProvider.class), conf);
157 provider.init(conf, (RMProxy<T>) this, protocol);
158 return provider;
159 }
160
161 /**
162 * Fetch retry policy from Configuration
163 */
164 @Private
165 @VisibleForTesting
166 public static RetryPolicy createRetryPolicy(Configuration conf) {
167 long rmConnectWaitMS =
168 conf.getInt(
169 YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
170 YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
171 long rmConnectionRetryIntervalMS =
172 conf.getLong(
173 YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
174 YarnConfiguration
175 .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
176
177 boolean waitForEver = (rmConnectWaitMS == -1);
178 if (!waitForEver) {
179 if (rmConnectWaitMS < 0) {
180 throw new YarnRuntimeException("Invalid Configuration. "
181 + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
182 + " can be -1, but can not be other negative numbers");
183 }
184
185 // try connect once
186 if (rmConnectWaitMS < rmConnectionRetryIntervalMS) {
187 LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
188 + " is smaller than "
189 + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS
190 + ". Only try connect once.");
191 rmConnectWaitMS = 0;
192 }
193 }
194
195 // Handle HA case first
196 if (HAUtil.isHAEnabled(conf)) {
197 final long failoverSleepBaseMs = conf.getLong(
198 YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
199 rmConnectionRetryIntervalMS);
200
201 final long failoverSleepMaxMs = conf.getLong(
202 YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS,
203 rmConnectionRetryIntervalMS);
204
205 int maxFailoverAttempts = conf.getInt(
206 YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
207
208 if (maxFailoverAttempts == -1) {
209 if (waitForEver) {
210 maxFailoverAttempts = Integer.MAX_VALUE;
211 } else {
212 maxFailoverAttempts = (int) (rmConnectWaitMS / failoverSleepBaseMs);
213 }
214 }
215
216 return RetryPolicies.failoverOnNetworkException(
217 RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
218 failoverSleepBaseMs, failoverSleepMaxMs);
219 }
220
221 if (waitForEver) {
222 return RetryPolicies.RETRY_FOREVER;
223 }
224
225 if (rmConnectionRetryIntervalMS < 0) {
226 throw new YarnRuntimeException("Invalid Configuration. " +
227 YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
228 " should not be negative.");
229 }
230
231 RetryPolicy retryPolicy =
232 RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
233 rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
234
235 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
236 new HashMap<Class<? extends Exception>, RetryPolicy>();
237 exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
238 //TO DO: after HADOOP-9576, IOException can be changed to EOFException
239 exceptionToPolicyMap.put(IOException.class, retryPolicy);
240 return RetryPolicies.retryByException(
241 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
242 }
243 }