001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.client; 020 021import java.io.EOFException; 022import java.net.ConnectException; 023import java.net.InetSocketAddress; 024import java.net.NoRouteToHostException; 025import java.net.SocketException; 026import java.net.UnknownHostException; 027import java.security.PrivilegedAction; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.concurrent.TimeUnit; 031 032import org.apache.hadoop.classification.InterfaceAudience.Public; 033import org.apache.hadoop.classification.InterfaceStability.Unstable; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.io.retry.RetryPolicies; 036import org.apache.hadoop.io.retry.RetryPolicy; 037import org.apache.hadoop.io.retry.RetryProxy; 038import org.apache.hadoop.ipc.RetriableException; 039import org.apache.hadoop.net.ConnectTimeoutException; 040import org.apache.hadoop.security.UserGroupInformation; 041import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; 042import org.apache.hadoop.yarn.ipc.YarnRPC; 043 044import com.google.common.base.Preconditions; 045 046@Public 047@Unstable 048public class ServerProxy { 049 050 protected static RetryPolicy createRetryPolicy(Configuration conf, 051 String maxWaitTimeStr, long defMaxWaitTime, 052 String connectRetryIntervalStr, long defRetryInterval) { 053 long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime); 054 long retryIntervalMS = 055 conf.getLong(connectRetryIntervalStr, defRetryInterval); 056 if (maxWaitTime == -1) { 057 // wait forever. 058 return RetryPolicies.RETRY_FOREVER; 059 } 060 061 Preconditions.checkArgument(maxWaitTime > 0, "Invalid Configuration. " 062 + maxWaitTimeStr + " should be a positive value."); 063 Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. " 064 + connectRetryIntervalStr + "should be a positive value."); 065 066 RetryPolicy retryPolicy = 067 RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime, 068 retryIntervalMS, TimeUnit.MILLISECONDS); 069 070 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 071 new HashMap<Class<? extends Exception>, RetryPolicy>(); 072 exceptionToPolicyMap.put(EOFException.class, retryPolicy); 073 exceptionToPolicyMap.put(ConnectException.class, retryPolicy); 074 exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy); 075 exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy); 076 exceptionToPolicyMap.put(RetriableException.class, retryPolicy); 077 exceptionToPolicyMap.put(SocketException.class, retryPolicy); 078 exceptionToPolicyMap.put(NMNotYetReadyException.class, retryPolicy); 079 080 return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, 081 exceptionToPolicyMap); 082 } 083 084 @SuppressWarnings("unchecked") 085 protected static <T> T createRetriableProxy(final Configuration conf, 086 final Class<T> protocol, final UserGroupInformation user, 087 final YarnRPC rpc, final InetSocketAddress serverAddress, 088 RetryPolicy retryPolicy) { 089 T proxy = user.doAs(new PrivilegedAction<T>() { 090 @Override 091 public T run() { 092 return (T) rpc.getProxy(protocol, serverAddress, conf); 093 } 094 }); 095 return (T) RetryProxy.create(protocol, proxy, retryPolicy); 096 } 097}