001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.client; 020 021 import java.io.IOException; 022 import java.net.ConnectException; 023 import java.net.InetSocketAddress; 024 import java.security.PrivilegedAction; 025 import java.util.HashMap; 026 import java.util.Map; 027 import java.util.concurrent.TimeUnit; 028 029 import org.apache.commons.logging.Log; 030 import org.apache.commons.logging.LogFactory; 031 import org.apache.hadoop.classification.InterfaceAudience; 032 import org.apache.hadoop.classification.InterfaceAudience.Private; 033 import org.apache.hadoop.classification.InterfaceStability; 034 import org.apache.hadoop.conf.Configuration; 035 import org.apache.hadoop.io.retry.RetryPolicies; 036 import org.apache.hadoop.io.retry.RetryPolicy; 037 import org.apache.hadoop.io.retry.RetryProxy; 038 import org.apache.hadoop.security.UserGroupInformation; 039 import org.apache.hadoop.util.ReflectionUtils; 040 import org.apache.hadoop.yarn.conf.HAUtil; 041 import org.apache.hadoop.yarn.conf.YarnConfiguration; 042 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 043 import org.apache.hadoop.yarn.ipc.YarnRPC; 044 045 import com.google.common.annotations.VisibleForTesting; 046 047 @InterfaceAudience.Public 048 @InterfaceStability.Evolving 049 @SuppressWarnings("unchecked") 050 public class RMProxy<T> { 051 052 private static final Log LOG = LogFactory.getLog(RMProxy.class); 053 054 protected RMProxy() {} 055 056 /** 057 * Verify the passed protocol is supported. 058 */ 059 @Private 060 protected void checkAllowedProtocols(Class<?> protocol) {} 061 062 /** 063 * Get the ResourceManager address from the provided Configuration for the 064 * given protocol. 065 */ 066 @Private 067 protected InetSocketAddress getRMAddress( 068 YarnConfiguration conf, Class<?> protocol) throws IOException { 069 throw new UnsupportedOperationException("This method should be invoked " + 070 "from an instance of ClientRMProxy or ServerRMProxy"); 071 } 072 073 /** 074 * Create a proxy for the specified protocol. For non-HA, 075 * this is a direct connection to the ResourceManager address. When HA is 076 * enabled, the proxy handles the failover between the ResourceManagers as 077 * well. 078 */ 079 @Private 080 protected static <T> T createRMProxy(final Configuration configuration, 081 final Class<T> protocol, RMProxy instance) throws IOException { 082 YarnConfiguration conf = (configuration instanceof YarnConfiguration) 083 ? (YarnConfiguration) configuration 084 : new YarnConfiguration(configuration); 085 RetryPolicy retryPolicy = createRetryPolicy(conf); 086 if (HAUtil.isHAEnabled(conf)) { 087 RMFailoverProxyProvider<T> provider = 088 instance.createRMFailoverProxyProvider(conf, protocol); 089 return (T) RetryProxy.create(protocol, provider, retryPolicy); 090 } else { 091 InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol); 092 LOG.info("Connecting to ResourceManager at " + rmAddress); 093 T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress); 094 return (T) RetryProxy.create(protocol, proxy, retryPolicy); 095 } 096 } 097 098 /** 099 * @deprecated 100 * This method is deprecated and is not used by YARN internally any more. 101 * To create a proxy to the RM, use ClientRMProxy#createRMProxy or 102 * ServerRMProxy#createRMProxy. 103 * 104 * Create a proxy to the ResourceManager at the specified address. 105 * 106 * @param conf Configuration to generate retry policy 107 * @param protocol Protocol for the proxy 108 * @param rmAddress Address of the ResourceManager 109 * @param <T> Type information of the proxy 110 * @return Proxy to the RM 111 * @throws IOException 112 */ 113 @Deprecated 114 public static <T> T createRMProxy(final Configuration conf, 115 final Class<T> protocol, InetSocketAddress rmAddress) throws IOException { 116 RetryPolicy retryPolicy = createRetryPolicy(conf); 117 T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress); 118 LOG.info("Connecting to ResourceManager at " + rmAddress); 119 return (T) RetryProxy.create(protocol, proxy, retryPolicy); 120 } 121 122 /** 123 * Get a proxy to the RM at the specified address. To be used to create a 124 * RetryProxy. 125 */ 126 @Private 127 static <T> T getProxy(final Configuration conf, 128 final Class<T> protocol, final InetSocketAddress rmAddress) 129 throws IOException { 130 return UserGroupInformation.getCurrentUser().doAs( 131 new PrivilegedAction<T>() { 132 @Override 133 public T run() { 134 return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf); 135 } 136 }); 137 } 138 139 /** 140 * Helper method to create FailoverProxyProvider. 141 */ 142 private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider( 143 Configuration conf, Class<T> protocol) { 144 Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass; 145 try { 146 defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>) 147 Class.forName( 148 YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER); 149 } catch (Exception e) { 150 throw new YarnRuntimeException("Invalid default failover provider class" + 151 YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e); 152 } 153 154 RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance( 155 conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER, 156 defaultProviderClass, RMFailoverProxyProvider.class), conf); 157 provider.init(conf, (RMProxy<T>) this, protocol); 158 return provider; 159 } 160 161 /** 162 * Fetch retry policy from Configuration 163 */ 164 @Private 165 @VisibleForTesting 166 public static RetryPolicy createRetryPolicy(Configuration conf) { 167 long rmConnectWaitMS = 168 conf.getInt( 169 YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, 170 YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS); 171 long rmConnectionRetryIntervalMS = 172 conf.getLong( 173 YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 174 YarnConfiguration 175 .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); 176 177 boolean waitForEver = (rmConnectWaitMS == -1); 178 if (!waitForEver) { 179 if (rmConnectWaitMS < 0) { 180 throw new YarnRuntimeException("Invalid Configuration. " 181 + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS 182 + " can be -1, but can not be other negative numbers"); 183 } 184 185 // try connect once 186 if (rmConnectWaitMS < rmConnectionRetryIntervalMS) { 187 LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS 188 + " is smaller than " 189 + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS 190 + ". Only try connect once."); 191 rmConnectWaitMS = 0; 192 } 193 } 194 195 // Handle HA case first 196 if (HAUtil.isHAEnabled(conf)) { 197 final long failoverSleepBaseMs = conf.getLong( 198 YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 199 rmConnectionRetryIntervalMS); 200 201 final long failoverSleepMaxMs = conf.getLong( 202 YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS, 203 rmConnectionRetryIntervalMS); 204 205 int maxFailoverAttempts = conf.getInt( 206 YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1); 207 208 if (maxFailoverAttempts == -1) { 209 if (waitForEver) { 210 maxFailoverAttempts = Integer.MAX_VALUE; 211 } else { 212 maxFailoverAttempts = (int) (rmConnectWaitMS / failoverSleepBaseMs); 213 } 214 } 215 216 return RetryPolicies.failoverOnNetworkException( 217 RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, 218 failoverSleepBaseMs, failoverSleepMaxMs); 219 } 220 221 if (waitForEver) { 222 return RetryPolicies.RETRY_FOREVER; 223 } 224 225 if (rmConnectionRetryIntervalMS < 0) { 226 throw new YarnRuntimeException("Invalid Configuration. " + 227 YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS + 228 " should not be negative."); 229 } 230 231 RetryPolicy retryPolicy = 232 RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS, 233 rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS); 234 235 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 236 new HashMap<Class<? extends Exception>, RetryPolicy>(); 237 exceptionToPolicyMap.put(ConnectException.class, retryPolicy); 238 //TO DO: after HADOOP-9576, IOException can be changed to EOFException 239 exceptionToPolicyMap.put(IOException.class, retryPolicy); 240 return RetryPolicies.retryByException( 241 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 242 } 243 }