001/** 002* Licensed to the Apache Software Foundation (ASF) under one 003* or more contributor license agreements. See the NOTICE file 004* distributed with this work for additional information 005* regarding copyright ownership. The ASF licenses this file 006* to you under the Apache License, Version 2.0 (the 007* "License"); you may not use this file except in compliance 008* with the License. You may obtain a copy of the License at 009* 010* http://www.apache.org/licenses/LICENSE-2.0 011* 012* Unless required by applicable law or agreed to in writing, software 013* distributed under the License is distributed on an "AS IS" BASIS, 014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015* See the License for the specific language governing permissions and 016* limitations under the License. 017*/ 018 019package org.apache.hadoop.yarn.client; 020 021import java.net.InetSocketAddress; 022import java.util.ArrayList; 023import java.util.List; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.io.Text; 031import org.apache.hadoop.ipc.RPC; 032import org.apache.hadoop.yarn.api.ClientRMProtocol; 033import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; 034import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; 035import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; 036import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; 037import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; 038import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; 039import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; 040import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; 041import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; 042import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; 043import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; 044import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; 045import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; 046import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; 047import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; 048import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; 049import org.apache.hadoop.yarn.api.records.ApplicationId; 050import org.apache.hadoop.yarn.api.records.ApplicationReport; 051import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; 052import org.apache.hadoop.yarn.api.records.DelegationToken; 053import org.apache.hadoop.yarn.api.records.NodeReport; 054import org.apache.hadoop.yarn.api.records.QueueInfo; 055import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; 056import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; 057import org.apache.hadoop.yarn.conf.YarnConfiguration; 058import org.apache.hadoop.yarn.exceptions.YarnRemoteException; 059import org.apache.hadoop.yarn.ipc.YarnRPC; 060import org.apache.hadoop.yarn.service.AbstractService; 061import org.apache.hadoop.yarn.util.Records; 062 063@InterfaceAudience.Public 064@InterfaceStability.Evolving 065public class YarnClientImpl extends AbstractService implements YarnClient { 066 067 private static final Log LOG = LogFactory.getLog(YarnClientImpl.class); 068 069 protected ClientRMProtocol rmClient; 070 protected InetSocketAddress rmAddress; 071 072 private static final String ROOT = "root"; 073 074 public YarnClientImpl() { 075 super(YarnClientImpl.class.getName()); 076 } 077 078 private static InetSocketAddress getRmAddress(Configuration conf) { 079 return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, 080 YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); 081 } 082 083 @Override 084 public synchronized void init(Configuration conf) { 085 this.rmAddress = getRmAddress(conf); 086 super.init(conf); 087 } 088 089 @Override 090 public synchronized void start() { 091 YarnRPC rpc = YarnRPC.create(getConfig()); 092 093 this.rmClient = 094 (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, 095 getConfig()); 096 LOG.debug("Connecting to ResourceManager at " + rmAddress); 097 super.start(); 098 } 099 100 @Override 101 public synchronized void stop() { 102 RPC.stopProxy(this.rmClient); 103 super.stop(); 104 } 105 106 @Override 107 public GetNewApplicationResponse getNewApplication() 108 throws YarnRemoteException { 109 GetNewApplicationRequest request = 110 Records.newRecord(GetNewApplicationRequest.class); 111 return rmClient.getNewApplication(request); 112 } 113 114 @Override 115 public ApplicationId 116 submitApplication(ApplicationSubmissionContext appContext) 117 throws YarnRemoteException { 118 ApplicationId applicationId = appContext.getApplicationId(); 119 appContext.setApplicationId(applicationId); 120 SubmitApplicationRequest request = 121 Records.newRecord(SubmitApplicationRequest.class); 122 request.setApplicationSubmissionContext(appContext); 123 rmClient.submitApplication(request); 124 LOG.info("Submitted application " + applicationId + " to ResourceManager" 125 + " at " + rmAddress); 126 return applicationId; 127 } 128 129 @Override 130 public void killApplication(ApplicationId applicationId) 131 throws YarnRemoteException { 132 LOG.info("Killing application " + applicationId); 133 KillApplicationRequest request = 134 Records.newRecord(KillApplicationRequest.class); 135 request.setApplicationId(applicationId); 136 rmClient.forceKillApplication(request); 137 } 138 139 @Override 140 public ApplicationReport getApplicationReport(ApplicationId appId) 141 throws YarnRemoteException { 142 GetApplicationReportRequest request = 143 Records.newRecord(GetApplicationReportRequest.class); 144 request.setApplicationId(appId); 145 GetApplicationReportResponse response = 146 rmClient.getApplicationReport(request); 147 return response.getApplicationReport(); 148 } 149 150 @Override 151 public List<ApplicationReport> getApplicationList() 152 throws YarnRemoteException { 153 GetAllApplicationsRequest request = 154 Records.newRecord(GetAllApplicationsRequest.class); 155 GetAllApplicationsResponse response = rmClient.getAllApplications(request); 156 return response.getApplicationList(); 157 } 158 159 @Override 160 public YarnClusterMetrics getYarnClusterMetrics() throws YarnRemoteException { 161 GetClusterMetricsRequest request = 162 Records.newRecord(GetClusterMetricsRequest.class); 163 GetClusterMetricsResponse response = rmClient.getClusterMetrics(request); 164 return response.getClusterMetrics(); 165 } 166 167 @Override 168 public List<NodeReport> getNodeReports() throws YarnRemoteException { 169 GetClusterNodesRequest request = 170 Records.newRecord(GetClusterNodesRequest.class); 171 GetClusterNodesResponse response = rmClient.getClusterNodes(request); 172 return response.getNodeReports(); 173 } 174 175 @Override 176 public DelegationToken getRMDelegationToken(Text renewer) 177 throws YarnRemoteException { 178 /* get the token from RM */ 179 GetDelegationTokenRequest rmDTRequest = 180 Records.newRecord(GetDelegationTokenRequest.class); 181 rmDTRequest.setRenewer(renewer.toString()); 182 GetDelegationTokenResponse response = 183 rmClient.getDelegationToken(rmDTRequest); 184 return response.getRMDelegationToken(); 185 } 186 187 private GetQueueInfoRequest 188 getQueueInfoRequest(String queueName, boolean includeApplications, 189 boolean includeChildQueues, boolean recursive) { 190 GetQueueInfoRequest request = Records.newRecord(GetQueueInfoRequest.class); 191 request.setQueueName(queueName); 192 request.setIncludeApplications(includeApplications); 193 request.setIncludeChildQueues(includeChildQueues); 194 request.setRecursive(recursive); 195 return request; 196 } 197 198 @Override 199 public QueueInfo getQueueInfo(String queueName) throws YarnRemoteException { 200 GetQueueInfoRequest request = 201 getQueueInfoRequest(queueName, true, false, false); 202 Records.newRecord(GetQueueInfoRequest.class); 203 return rmClient.getQueueInfo(request).getQueueInfo(); 204 } 205 206 @Override 207 public List<QueueUserACLInfo> getQueueAclsInfo() throws YarnRemoteException { 208 GetQueueUserAclsInfoRequest request = 209 Records.newRecord(GetQueueUserAclsInfoRequest.class); 210 return rmClient.getQueueUserAcls(request).getUserAclsInfoList(); 211 } 212 213 @Override 214 public List<QueueInfo> getAllQueues() throws YarnRemoteException { 215 List<QueueInfo> queues = new ArrayList<QueueInfo>(); 216 217 QueueInfo rootQueue = 218 rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true)) 219 .getQueueInfo(); 220 getChildQueues(rootQueue, queues, true); 221 return queues; 222 } 223 224 @Override 225 public List<QueueInfo> getRootQueueInfos() throws YarnRemoteException { 226 List<QueueInfo> queues = new ArrayList<QueueInfo>(); 227 228 QueueInfo rootQueue = 229 rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true)) 230 .getQueueInfo(); 231 getChildQueues(rootQueue, queues, false); 232 return queues; 233 } 234 235 @Override 236 public List<QueueInfo> getChildQueueInfos(String parent) 237 throws YarnRemoteException { 238 List<QueueInfo> queues = new ArrayList<QueueInfo>(); 239 240 QueueInfo parentQueue = 241 rmClient.getQueueInfo(getQueueInfoRequest(parent, false, true, false)) 242 .getQueueInfo(); 243 getChildQueues(parentQueue, queues, true); 244 return queues; 245 } 246 247 private void getChildQueues(QueueInfo parent, List<QueueInfo> queues, 248 boolean recursive) { 249 List<QueueInfo> childQueues = parent.getChildQueues(); 250 251 for (QueueInfo child : childQueues) { 252 queues.add(child); 253 if (recursive) { 254 getChildQueues(child, queues, recursive); 255 } 256 } 257 } 258}