001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.client; 020 021import java.io.EOFException; 022import java.net.ConnectException; 023import java.net.InetSocketAddress; 024import java.net.NoRouteToHostException; 025import java.net.SocketException; 026import java.net.UnknownHostException; 027import java.security.PrivilegedAction; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.concurrent.TimeUnit; 031 032import org.apache.hadoop.classification.InterfaceAudience.Public; 033import org.apache.hadoop.classification.InterfaceStability.Unstable; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.io.retry.RetryPolicies; 036import org.apache.hadoop.io.retry.RetryPolicy; 037import org.apache.hadoop.io.retry.RetryProxy; 038import org.apache.hadoop.ipc.RetriableException; 039import org.apache.hadoop.net.ConnectTimeoutException; 040import org.apache.hadoop.security.UserGroupInformation; 041import org.apache.hadoop.yarn.ipc.YarnRPC; 042 043import com.google.common.base.Preconditions; 044 045@Public 046@Unstable 047public class ServerProxy { 048 049 protected static RetryPolicy createRetryPolicy(Configuration conf, 050 String maxWaitTimeStr, long defMaxWaitTime, 051 String connectRetryIntervalStr, long defRetryInterval) { 052 long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime); 053 long retryIntervalMS = 054 conf.getLong(connectRetryIntervalStr, defRetryInterval); 055 if (maxWaitTime == -1) { 056 // wait forever. 057 return RetryPolicies.RETRY_FOREVER; 058 } 059 060 Preconditions.checkArgument(maxWaitTime > 0, "Invalid Configuration. " 061 + maxWaitTimeStr + " should be a positive value."); 062 Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. " 063 + connectRetryIntervalStr + "should be a positive value."); 064 065 RetryPolicy retryPolicy = 066 RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime, 067 retryIntervalMS, TimeUnit.MILLISECONDS); 068 069 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 070 new HashMap<Class<? extends Exception>, RetryPolicy>(); 071 exceptionToPolicyMap.put(EOFException.class, retryPolicy); 072 exceptionToPolicyMap.put(ConnectException.class, retryPolicy); 073 exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy); 074 exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy); 075 exceptionToPolicyMap.put(RetriableException.class, retryPolicy); 076 exceptionToPolicyMap.put(SocketException.class, retryPolicy); 077 078 return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, 079 exceptionToPolicyMap); 080 } 081 082 @SuppressWarnings("unchecked") 083 protected static <T> T createRetriableProxy(final Configuration conf, 084 final Class<T> protocol, final UserGroupInformation user, 085 final YarnRPC rpc, final InetSocketAddress serverAddress, 086 RetryPolicy retryPolicy) { 087 T proxy = user.doAs(new PrivilegedAction<T>() { 088 @Override 089 public T run() { 090 return (T) rpc.getProxy(protocol, serverAddress, conf); 091 } 092 }); 093 return (T) RetryProxy.create(protocol, proxy, retryPolicy); 094 } 095}