001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.hadoop.yarn.client;
020
021import java.net.InetSocketAddress;
022import java.util.ArrayList;
023import java.util.List;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.io.Text;
031import org.apache.hadoop.ipc.RPC;
032import org.apache.hadoop.yarn.api.ClientRMProtocol;
033import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
034import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
035import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
036import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
037import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
038import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
039import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
040import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
041import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
042import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
043import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
044import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
045import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
046import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
047import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
048import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
049import org.apache.hadoop.yarn.api.records.ApplicationId;
050import org.apache.hadoop.yarn.api.records.ApplicationReport;
051import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
052import org.apache.hadoop.yarn.api.records.DelegationToken;
053import org.apache.hadoop.yarn.api.records.NodeReport;
054import org.apache.hadoop.yarn.api.records.QueueInfo;
055import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
056import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
057import org.apache.hadoop.yarn.conf.YarnConfiguration;
058import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
059import org.apache.hadoop.yarn.ipc.YarnRPC;
060import org.apache.hadoop.yarn.service.AbstractService;
061import org.apache.hadoop.yarn.util.Records;
062
063@InterfaceAudience.Public
064@InterfaceStability.Evolving
065public class YarnClientImpl extends AbstractService implements YarnClient {
066
067  private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
068
069  protected ClientRMProtocol rmClient;
070  protected InetSocketAddress rmAddress;
071
072  private static final String ROOT = "root";
073
074  public YarnClientImpl() {
075    super(YarnClientImpl.class.getName());
076  }
077
078  private static InetSocketAddress getRmAddress(Configuration conf) {
079    return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
080      YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
081  }
082
083  @Override
084  public synchronized void init(Configuration conf) {
085    this.rmAddress = getRmAddress(conf);
086    super.init(conf);
087  }
088
089  @Override
090  public synchronized void start() {
091    YarnRPC rpc = YarnRPC.create(getConfig());
092
093    this.rmClient =
094        (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress,
095          getConfig());
096    LOG.debug("Connecting to ResourceManager at " + rmAddress);
097    super.start();
098  }
099
100  @Override
101  public synchronized void stop() {
102    RPC.stopProxy(this.rmClient);
103    super.stop();
104  }
105
106  @Override
107  public GetNewApplicationResponse getNewApplication()
108      throws YarnRemoteException {
109    GetNewApplicationRequest request =
110        Records.newRecord(GetNewApplicationRequest.class);
111    return rmClient.getNewApplication(request);
112  }
113
114  @Override
115  public ApplicationId
116      submitApplication(ApplicationSubmissionContext appContext)
117          throws YarnRemoteException {
118    ApplicationId applicationId = appContext.getApplicationId();
119    appContext.setApplicationId(applicationId);
120    SubmitApplicationRequest request =
121        Records.newRecord(SubmitApplicationRequest.class);
122    request.setApplicationSubmissionContext(appContext);
123    rmClient.submitApplication(request);
124    LOG.info("Submitted application " + applicationId + " to ResourceManager"
125        + " at " + rmAddress);
126    return applicationId;
127  }
128
129  @Override
130  public void killApplication(ApplicationId applicationId)
131      throws YarnRemoteException {
132    LOG.info("Killing application " + applicationId);
133    KillApplicationRequest request =
134        Records.newRecord(KillApplicationRequest.class);
135    request.setApplicationId(applicationId);
136    rmClient.forceKillApplication(request);
137  }
138
139  @Override
140  public ApplicationReport getApplicationReport(ApplicationId appId)
141      throws YarnRemoteException {
142    GetApplicationReportRequest request =
143        Records.newRecord(GetApplicationReportRequest.class);
144    request.setApplicationId(appId);
145    GetApplicationReportResponse response =
146        rmClient.getApplicationReport(request);
147    return response.getApplicationReport();
148  }
149
150  @Override
151  public List<ApplicationReport> getApplicationList()
152      throws YarnRemoteException {
153    GetAllApplicationsRequest request =
154        Records.newRecord(GetAllApplicationsRequest.class);
155    GetAllApplicationsResponse response = rmClient.getAllApplications(request);
156    return response.getApplicationList();
157  }
158
159  @Override
160  public YarnClusterMetrics getYarnClusterMetrics() throws YarnRemoteException {
161    GetClusterMetricsRequest request =
162        Records.newRecord(GetClusterMetricsRequest.class);
163    GetClusterMetricsResponse response = rmClient.getClusterMetrics(request);
164    return response.getClusterMetrics();
165  }
166
167  @Override
168  public List<NodeReport> getNodeReports() throws YarnRemoteException {
169    GetClusterNodesRequest request =
170        Records.newRecord(GetClusterNodesRequest.class);
171    GetClusterNodesResponse response = rmClient.getClusterNodes(request);
172    return response.getNodeReports();
173  }
174
175  @Override
176  public DelegationToken getRMDelegationToken(Text renewer)
177      throws YarnRemoteException {
178    /* get the token from RM */
179    GetDelegationTokenRequest rmDTRequest =
180        Records.newRecord(GetDelegationTokenRequest.class);
181    rmDTRequest.setRenewer(renewer.toString());
182    GetDelegationTokenResponse response =
183        rmClient.getDelegationToken(rmDTRequest);
184    return response.getRMDelegationToken();
185  }
186
187  private GetQueueInfoRequest
188      getQueueInfoRequest(String queueName, boolean includeApplications,
189          boolean includeChildQueues, boolean recursive) {
190    GetQueueInfoRequest request = Records.newRecord(GetQueueInfoRequest.class);
191    request.setQueueName(queueName);
192    request.setIncludeApplications(includeApplications);
193    request.setIncludeChildQueues(includeChildQueues);
194    request.setRecursive(recursive);
195    return request;
196  }
197
198  @Override
199  public QueueInfo getQueueInfo(String queueName) throws YarnRemoteException {
200    GetQueueInfoRequest request =
201        getQueueInfoRequest(queueName, true, false, false);
202    Records.newRecord(GetQueueInfoRequest.class);
203    return rmClient.getQueueInfo(request).getQueueInfo();
204  }
205
206  @Override
207  public List<QueueUserACLInfo> getQueueAclsInfo() throws YarnRemoteException {
208    GetQueueUserAclsInfoRequest request =
209        Records.newRecord(GetQueueUserAclsInfoRequest.class);
210    return rmClient.getQueueUserAcls(request).getUserAclsInfoList();
211  }
212
213  @Override
214  public List<QueueInfo> getAllQueues() throws YarnRemoteException {
215    List<QueueInfo> queues = new ArrayList<QueueInfo>();
216
217    QueueInfo rootQueue =
218        rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true))
219          .getQueueInfo();
220    getChildQueues(rootQueue, queues, true);
221    return queues;
222  }
223
224  @Override
225  public List<QueueInfo> getRootQueueInfos() throws YarnRemoteException {
226    List<QueueInfo> queues = new ArrayList<QueueInfo>();
227
228    QueueInfo rootQueue =
229        rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true))
230          .getQueueInfo();
231    getChildQueues(rootQueue, queues, false);
232    return queues;
233  }
234
235  @Override
236  public List<QueueInfo> getChildQueueInfos(String parent)
237      throws YarnRemoteException {
238    List<QueueInfo> queues = new ArrayList<QueueInfo>();
239
240    QueueInfo parentQueue =
241        rmClient.getQueueInfo(getQueueInfoRequest(parent, false, true, false))
242          .getQueueInfo();
243    getChildQueues(parentQueue, queues, true);
244    return queues;
245  }
246
247  private void getChildQueues(QueueInfo parent, List<QueueInfo> queues,
248      boolean recursive) {
249    List<QueueInfo> childQueues = parent.getChildQueues();
250
251    for (QueueInfo child : childQueues) {
252      queues.add(child);
253      if (recursive) {
254        getChildQueues(child, queues, recursive);
255      }
256    }
257  }
258}