001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce;
020
021 import java.io.FileNotFoundException;
022 import java.io.IOException;
023 import java.net.InetSocketAddress;
024 import java.security.PrivilegedExceptionAction;
025 import java.util.ArrayList;
026 import java.util.List;
027 import java.util.ServiceLoader;
028
029 import org.apache.commons.logging.Log;
030 import org.apache.commons.logging.LogFactory;
031 import org.apache.hadoop.classification.InterfaceAudience;
032 import org.apache.hadoop.classification.InterfaceStability;
033 import org.apache.hadoop.conf.Configuration;
034 import org.apache.hadoop.fs.FileSystem;
035 import org.apache.hadoop.fs.Path;
036 import org.apache.hadoop.io.Text;
037 import org.apache.hadoop.mapred.JobConf;
038 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
039 import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
040 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
041 import org.apache.hadoop.mapreduce.util.ConfigUtil;
042 import org.apache.hadoop.mapreduce.v2.LogParams;
043 import org.apache.hadoop.security.UserGroupInformation;
044 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
045 import org.apache.hadoop.security.token.Token;
046
047 /**
048 * Provides a way to access information about the map/reduce cluster.
049 */
050 @InterfaceAudience.Public
051 @InterfaceStability.Evolving
052 public class Cluster {
053
054 @InterfaceStability.Evolving
055 public static enum JobTrackerStatus {INITIALIZING, RUNNING};
056
057 private ClientProtocolProvider clientProtocolProvider;
058 private ClientProtocol client;
059 private UserGroupInformation ugi;
060 private Configuration conf;
061 private FileSystem fs = null;
062 private Path sysDir = null;
063 private Path stagingAreaDir = null;
064 private Path jobHistoryDir = null;
065 private static final Log LOG = LogFactory.getLog(Cluster.class);
066
067 private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
068 ServiceLoader.load(ClientProtocolProvider.class);
069
070 static {
071 ConfigUtil.loadResources();
072 }
073
074 public Cluster(Configuration conf) throws IOException {
075 this(null, conf);
076 }
077
078 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
079 throws IOException {
080 this.conf = conf;
081 this.ugi = UserGroupInformation.getCurrentUser();
082 initialize(jobTrackAddr, conf);
083 }
084
085 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
086 throws IOException {
087
088 synchronized (frameworkLoader) {
089 for (ClientProtocolProvider provider : frameworkLoader) {
090 LOG.debug("Trying ClientProtocolProvider : "
091 + provider.getClass().getName());
092 ClientProtocol clientProtocol = null;
093 try {
094 if (jobTrackAddr == null) {
095 clientProtocol = provider.create(conf);
096 } else {
097 clientProtocol = provider.create(jobTrackAddr, conf);
098 }
099
100 if (clientProtocol != null) {
101 clientProtocolProvider = provider;
102 client = clientProtocol;
103 LOG.debug("Picked " + provider.getClass().getName()
104 + " as the ClientProtocolProvider");
105 break;
106 }
107 else {
108 LOG.debug("Cannot pick " + provider.getClass().getName()
109 + " as the ClientProtocolProvider - returned null protocol");
110 }
111 }
112 catch (Exception e) {
113 LOG.info("Failed to use " + provider.getClass().getName()
114 + " due to error: " + e.getMessage());
115 }
116 }
117 }
118
119 if (null == clientProtocolProvider || null == client) {
120 throw new IOException(
121 "Cannot initialize Cluster. Please check your configuration for "
122 + MRConfig.FRAMEWORK_NAME
123 + " and the correspond server addresses.");
124 }
125 }
126
127 ClientProtocol getClient() {
128 return client;
129 }
130
131 Configuration getConf() {
132 return conf;
133 }
134
135 /**
136 * Close the <code>Cluster</code>.
137 */
138 public synchronized void close() throws IOException {
139 clientProtocolProvider.close(client);
140 }
141
142 private Job[] getJobs(JobStatus[] stats) throws IOException {
143 List<Job> jobs = new ArrayList<Job>();
144 for (JobStatus stat : stats) {
145 jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
146 }
147 return jobs.toArray(new Job[0]);
148 }
149
150 /**
151 * Get the file system where job-specific files are stored
152 *
153 * @return object of FileSystem
154 * @throws IOException
155 * @throws InterruptedException
156 */
157 public synchronized FileSystem getFileSystem()
158 throws IOException, InterruptedException {
159 if (this.fs == null) {
160 try {
161 this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
162 public FileSystem run() throws IOException, InterruptedException {
163 final Path sysDir = new Path(client.getSystemDir());
164 return sysDir.getFileSystem(getConf());
165 }
166 });
167 } catch (InterruptedException e) {
168 throw new RuntimeException(e);
169 }
170 }
171 return fs;
172 }
173
174 /**
175 * Get job corresponding to jobid.
176 *
177 * @param jobId
178 * @return object of {@link Job}
179 * @throws IOException
180 * @throws InterruptedException
181 */
182 public Job getJob(JobID jobId) throws IOException, InterruptedException {
183 JobStatus status = client.getJobStatus(jobId);
184 if (status != null) {
185 JobConf conf;
186 try {
187 conf = new JobConf(status.getJobFile());
188 } catch (RuntimeException ex) {
189 // If job file doesn't exist it means we can't find the job
190 if (ex.getCause() instanceof FileNotFoundException) {
191 return null;
192 } else {
193 throw ex;
194 }
195 }
196 return Job.getInstance(this, status, conf);
197 }
198 return null;
199 }
200
201 /**
202 * Get all the queues in cluster.
203 *
204 * @return array of {@link QueueInfo}
205 * @throws IOException
206 * @throws InterruptedException
207 */
208 public QueueInfo[] getQueues() throws IOException, InterruptedException {
209 return client.getQueues();
210 }
211
212 /**
213 * Get queue information for the specified name.
214 *
215 * @param name queuename
216 * @return object of {@link QueueInfo}
217 * @throws IOException
218 * @throws InterruptedException
219 */
220 public QueueInfo getQueue(String name)
221 throws IOException, InterruptedException {
222 return client.getQueue(name);
223 }
224
225 /**
226 * Get log parameters for the specified jobID or taskAttemptID
227 * @param jobID the job id.
228 * @param taskAttemptID the task attempt id. Optional.
229 * @return the LogParams
230 * @throws IOException
231 * @throws InterruptedException
232 */
233 public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
234 throws IOException, InterruptedException {
235 return client.getLogFileParams(jobID, taskAttemptID);
236 }
237
238 /**
239 * Get current cluster status.
240 *
241 * @return object of {@link ClusterMetrics}
242 * @throws IOException
243 * @throws InterruptedException
244 */
245 public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
246 return client.getClusterMetrics();
247 }
248
249 /**
250 * Get all active trackers in the cluster.
251 *
252 * @return array of {@link TaskTrackerInfo}
253 * @throws IOException
254 * @throws InterruptedException
255 */
256 public TaskTrackerInfo[] getActiveTaskTrackers()
257 throws IOException, InterruptedException {
258 return client.getActiveTrackers();
259 }
260
261 /**
262 * Get blacklisted trackers.
263 *
264 * @return array of {@link TaskTrackerInfo}
265 * @throws IOException
266 * @throws InterruptedException
267 */
268 public TaskTrackerInfo[] getBlackListedTaskTrackers()
269 throws IOException, InterruptedException {
270 return client.getBlacklistedTrackers();
271 }
272
273 /**
274 * Get all the jobs in cluster.
275 *
276 * @return array of {@link Job}
277 * @throws IOException
278 * @throws InterruptedException
279 * @deprecated Use {@link #getAllJobStatuses()} instead.
280 */
281 @Deprecated
282 public Job[] getAllJobs() throws IOException, InterruptedException {
283 return getJobs(client.getAllJobs());
284 }
285
286 /**
287 * Get job status for all jobs in the cluster.
288 * @return job status for all jobs in cluster
289 * @throws IOException
290 * @throws InterruptedException
291 */
292 public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
293 return client.getAllJobs();
294 }
295
296 /**
297 * Grab the jobtracker system directory path where
298 * job-specific files will be placed.
299 *
300 * @return the system directory where job-specific files are to be placed.
301 */
302 public Path getSystemDir() throws IOException, InterruptedException {
303 if (sysDir == null) {
304 sysDir = new Path(client.getSystemDir());
305 }
306 return sysDir;
307 }
308
309 /**
310 * Grab the jobtracker's view of the staging directory path where
311 * job-specific files will be placed.
312 *
313 * @return the staging directory where job-specific files are to be placed.
314 */
315 public Path getStagingAreaDir() throws IOException, InterruptedException {
316 if (stagingAreaDir == null) {
317 stagingAreaDir = new Path(client.getStagingAreaDir());
318 }
319 return stagingAreaDir;
320 }
321
322 /**
323 * Get the job history file path for a given job id. The job history file at
324 * this path may or may not be existing depending on the job completion state.
325 * The file is present only for the completed jobs.
326 * @param jobId the JobID of the job submitted by the current user.
327 * @return the file path of the job history file
328 * @throws IOException
329 * @throws InterruptedException
330 */
331 public String getJobHistoryUrl(JobID jobId) throws IOException,
332 InterruptedException {
333 if (jobHistoryDir == null) {
334 jobHistoryDir = new Path(client.getJobHistoryDir());
335 }
336 return new Path(jobHistoryDir, jobId.toString() + "_"
337 + ugi.getShortUserName()).toString();
338 }
339
340 /**
341 * Gets the Queue ACLs for current user
342 * @return array of QueueAclsInfo object for current user.
343 * @throws IOException
344 */
345 public QueueAclsInfo[] getQueueAclsForCurrentUser()
346 throws IOException, InterruptedException {
347 return client.getQueueAclsForCurrentUser();
348 }
349
350 /**
351 * Gets the root level queues.
352 * @return array of JobQueueInfo object.
353 * @throws IOException
354 */
355 public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
356 return client.getRootQueues();
357 }
358
359 /**
360 * Returns immediate children of queueName.
361 * @param queueName
362 * @return array of JobQueueInfo which are children of queueName
363 * @throws IOException
364 */
365 public QueueInfo[] getChildQueues(String queueName)
366 throws IOException, InterruptedException {
367 return client.getChildQueues(queueName);
368 }
369
370 /**
371 * Get the JobTracker's status.
372 *
373 * @return {@link JobTrackerStatus} of the JobTracker
374 * @throws IOException
375 * @throws InterruptedException
376 */
377 public JobTrackerStatus getJobTrackerStatus() throws IOException,
378 InterruptedException {
379 return client.getJobTrackerStatus();
380 }
381
382 /**
383 * Get the tasktracker expiry interval for the cluster
384 * @return the expiry interval in msec
385 */
386 public long getTaskTrackerExpiryInterval() throws IOException,
387 InterruptedException {
388 return client.getTaskTrackerExpiryInterval();
389 }
390
391 /**
392 * Get a delegation token for the user from the JobTracker.
393 * @param renewer the user who can renew the token
394 * @return the new token
395 * @throws IOException
396 */
397 public Token<DelegationTokenIdentifier>
398 getDelegationToken(Text renewer) throws IOException, InterruptedException{
399 // client has already set the service
400 return client.getDelegationToken(renewer);
401 }
402
403 /**
404 * Renew a delegation token
405 * @param token the token to renew
406 * @return the new expiration time
407 * @throws InvalidToken
408 * @throws IOException
409 * @deprecated Use {@link Token#renew} instead
410 */
411 public long renewDelegationToken(Token<DelegationTokenIdentifier> token
412 ) throws InvalidToken, IOException,
413 InterruptedException {
414 return token.renew(getConf());
415 }
416
417 /**
418 * Cancel a delegation token from the JobTracker
419 * @param token the token to cancel
420 * @throws IOException
421 * @deprecated Use {@link Token#cancel} instead
422 */
423 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
424 ) throws IOException,
425 InterruptedException {
426 token.cancel(getConf());
427 }
428
429 }