001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.client; 020 021 import java.io.EOFException; 022 import java.net.ConnectException; 023 import java.net.InetSocketAddress; 024 import java.net.NoRouteToHostException; 025 import java.net.SocketException; 026 import java.net.UnknownHostException; 027 import java.security.PrivilegedAction; 028 import java.util.HashMap; 029 import java.util.Map; 030 import java.util.concurrent.TimeUnit; 031 032 import org.apache.hadoop.classification.InterfaceAudience.Public; 033 import org.apache.hadoop.classification.InterfaceStability.Unstable; 034 import org.apache.hadoop.conf.Configuration; 035 import org.apache.hadoop.io.retry.RetryPolicies; 036 import org.apache.hadoop.io.retry.RetryPolicy; 037 import org.apache.hadoop.io.retry.RetryProxy; 038 import org.apache.hadoop.ipc.RetriableException; 039 import org.apache.hadoop.net.ConnectTimeoutException; 040 import org.apache.hadoop.security.UserGroupInformation; 041 import org.apache.hadoop.yarn.ipc.YarnRPC; 042 043 import com.google.common.base.Preconditions; 044 045 @Public 046 @Unstable 047 public class ServerProxy { 048 049 protected static RetryPolicy createRetryPolicy(Configuration conf, 050 String maxWaitTimeStr, long defMaxWaitTime, 051 String connectRetryIntervalStr, long defRetryInterval) { 052 long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime); 053 long retryIntervalMS = 054 conf.getLong(connectRetryIntervalStr, defRetryInterval); 055 if (maxWaitTime == -1) { 056 // wait forever. 057 return RetryPolicies.RETRY_FOREVER; 058 } 059 060 Preconditions.checkArgument(maxWaitTime > 0, "Invalid Configuration. " 061 + maxWaitTimeStr + " should be a positive value."); 062 Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. " 063 + connectRetryIntervalStr + "should be a positive value."); 064 065 RetryPolicy retryPolicy = 066 RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime, 067 retryIntervalMS, TimeUnit.MILLISECONDS); 068 069 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 070 new HashMap<Class<? extends Exception>, RetryPolicy>(); 071 exceptionToPolicyMap.put(EOFException.class, retryPolicy); 072 exceptionToPolicyMap.put(ConnectException.class, retryPolicy); 073 exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy); 074 exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy); 075 exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy); 076 exceptionToPolicyMap.put(RetriableException.class, retryPolicy); 077 exceptionToPolicyMap.put(SocketException.class, retryPolicy); 078 079 return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, 080 exceptionToPolicyMap); 081 } 082 083 @SuppressWarnings("unchecked") 084 protected static <T> T createRetriableProxy(final Configuration conf, 085 final Class<T> protocol, final UserGroupInformation user, 086 final YarnRPC rpc, final InetSocketAddress serverAddress, 087 RetryPolicy retryPolicy) { 088 T proxy = user.doAs(new PrivilegedAction<T>() { 089 @Override 090 public T run() { 091 return (T) rpc.getProxy(protocol, serverAddress, conf); 092 } 093 }); 094 return (T) RetryProxy.create(protocol, proxy, retryPolicy); 095 } 096 }