001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.client; 020 021import java.io.EOFException; 022import java.net.ConnectException; 023import java.net.InetSocketAddress; 024import java.net.NoRouteToHostException; 025import java.net.SocketException; 026import java.net.UnknownHostException; 027import java.security.PrivilegedAction; 028import java.util.HashMap; 029import java.util.Map; 030import java.util.concurrent.TimeUnit; 031 032import org.apache.hadoop.classification.InterfaceAudience.Public; 033import org.apache.hadoop.classification.InterfaceStability.Unstable; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.io.retry.RetryPolicies; 036import org.apache.hadoop.io.retry.RetryPolicy; 037import org.apache.hadoop.io.retry.RetryProxy; 038import org.apache.hadoop.ipc.RetriableException; 039import org.apache.hadoop.net.ConnectTimeoutException; 040import org.apache.hadoop.security.UserGroupInformation; 041import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; 042import org.apache.hadoop.yarn.ipc.YarnRPC; 043 044import com.google.common.base.Preconditions; 045 046@Public 047@Unstable 048public class ServerProxy { 049 050 protected static RetryPolicy createRetryPolicy(Configuration conf, 051 String maxWaitTimeStr, long defMaxWaitTime, 052 String connectRetryIntervalStr, long defRetryInterval) { 053 long maxWaitTime = conf.getLong(maxWaitTimeStr, defMaxWaitTime); 054 long retryIntervalMS = 055 conf.getLong(connectRetryIntervalStr, defRetryInterval); 056 057 Preconditions.checkArgument((maxWaitTime == -1 || maxWaitTime > 0), 058 "Invalid Configuration. " + maxWaitTimeStr + " should be either" 059 + " positive value or -1."); 060 Preconditions.checkArgument(retryIntervalMS > 0, "Invalid Configuration. " 061 + connectRetryIntervalStr + "should be a positive value."); 062 063 RetryPolicy retryPolicy = null; 064 if (maxWaitTime == -1) { 065 // wait forever. 066 retryPolicy = RetryPolicies.RETRY_FOREVER; 067 } else { 068 retryPolicy = 069 RetryPolicies.retryUpToMaximumTimeWithFixedSleep(maxWaitTime, 070 retryIntervalMS, TimeUnit.MILLISECONDS); 071 } 072 073 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 074 new HashMap<Class<? extends Exception>, RetryPolicy>(); 075 exceptionToPolicyMap.put(EOFException.class, retryPolicy); 076 exceptionToPolicyMap.put(ConnectException.class, retryPolicy); 077 exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy); 078 exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy); 079 exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy); 080 exceptionToPolicyMap.put(RetriableException.class, retryPolicy); 081 exceptionToPolicyMap.put(SocketException.class, retryPolicy); 082 exceptionToPolicyMap.put(NMNotYetReadyException.class, retryPolicy); 083 084 return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, 085 exceptionToPolicyMap); 086 } 087 088 @SuppressWarnings("unchecked") 089 protected static <T> T createRetriableProxy(final Configuration conf, 090 final Class<T> protocol, final UserGroupInformation user, 091 final YarnRPC rpc, final InetSocketAddress serverAddress, 092 RetryPolicy retryPolicy) { 093 T proxy = user.doAs(new PrivilegedAction<T>() { 094 @Override 095 public T run() { 096 return (T) rpc.getProxy(protocol, serverAddress, conf); 097 } 098 }); 099 return (T) RetryProxy.create(protocol, proxy, retryPolicy); 100 } 101}