001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.ServiceConfigurationError; 028import java.util.ServiceLoader; 029 030import com.google.common.annotations.VisibleForTesting; 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.apache.hadoop.classification.InterfaceAudience; 034import org.apache.hadoop.classification.InterfaceStability; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.io.Text; 039import org.apache.hadoop.mapred.JobConf; 040import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 041import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; 042import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 043import org.apache.hadoop.mapreduce.util.ConfigUtil; 044import org.apache.hadoop.mapreduce.v2.LogParams; 045import org.apache.hadoop.security.UserGroupInformation; 046import org.apache.hadoop.security.token.SecretManager.InvalidToken; 047import org.apache.hadoop.security.token.Token; 048 049/** 050 * Provides a way to access information about the map/reduce cluster. 051 */ 052@InterfaceAudience.Public 053@InterfaceStability.Evolving 054public class Cluster { 055 056 @InterfaceStability.Evolving 057 public static enum JobTrackerStatus {INITIALIZING, RUNNING}; 058 059 private ClientProtocolProvider clientProtocolProvider; 060 private ClientProtocol client; 061 private UserGroupInformation ugi; 062 private Configuration conf; 063 private FileSystem fs = null; 064 private Path sysDir = null; 065 private Path stagingAreaDir = null; 066 private Path jobHistoryDir = null; 067 private static final Log LOG = LogFactory.getLog(Cluster.class); 068 069 @VisibleForTesting 070 static Iterable<ClientProtocolProvider> frameworkLoader = 071 ServiceLoader.load(ClientProtocolProvider.class); 072 private volatile List<ClientProtocolProvider> providerList = null; 073 074 private void initProviderList() { 075 if (providerList == null) { 076 synchronized (frameworkLoader) { 077 if (providerList == null) { 078 List<ClientProtocolProvider> localProviderList = 079 new ArrayList<ClientProtocolProvider>(); 080 try { 081 for (ClientProtocolProvider provider : frameworkLoader) { 082 localProviderList.add(provider); 083 } 084 } catch(ServiceConfigurationError e) { 085 LOG.info("Failed to instantiate ClientProtocolProvider, please " 086 + "check the /META-INF/services/org.apache." 087 + "hadoop.mapreduce.protocol.ClientProtocolProvider " 088 + "files on the classpath", e); 089 } 090 providerList = localProviderList; 091 } 092 } 093 } 094 } 095 096 static { 097 ConfigUtil.loadResources(); 098 } 099 100 public Cluster(Configuration conf) throws IOException { 101 this(null, conf); 102 } 103 104 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 105 throws IOException { 106 this.conf = conf; 107 this.ugi = UserGroupInformation.getCurrentUser(); 108 initialize(jobTrackAddr, conf); 109 } 110 111 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) 112 throws IOException { 113 114 initProviderList(); 115 for (ClientProtocolProvider provider : providerList) { 116 LOG.debug("Trying ClientProtocolProvider : " 117 + provider.getClass().getName()); 118 ClientProtocol clientProtocol = null; 119 try { 120 if (jobTrackAddr == null) { 121 clientProtocol = provider.create(conf); 122 } else { 123 clientProtocol = provider.create(jobTrackAddr, conf); 124 } 125 126 if (clientProtocol != null) { 127 clientProtocolProvider = provider; 128 client = clientProtocol; 129 LOG.debug("Picked " + provider.getClass().getName() 130 + " as the ClientProtocolProvider"); 131 break; 132 } else { 133 LOG.debug("Cannot pick " + provider.getClass().getName() 134 + " as the ClientProtocolProvider - returned null protocol"); 135 } 136 } catch (Exception e) { 137 LOG.info("Failed to use " + provider.getClass().getName() 138 + " due to error: ", e); 139 } 140 } 141 142 if (null == clientProtocolProvider || null == client) { 143 throw new IOException( 144 "Cannot initialize Cluster. Please check your configuration for " 145 + MRConfig.FRAMEWORK_NAME 146 + " and the correspond server addresses."); 147 } 148 } 149 150 ClientProtocol getClient() { 151 return client; 152 } 153 154 Configuration getConf() { 155 return conf; 156 } 157 158 /** 159 * Close the <code>Cluster</code>. 160 * @throws IOException 161 */ 162 public synchronized void close() throws IOException { 163 clientProtocolProvider.close(client); 164 } 165 166 private Job[] getJobs(JobStatus[] stats) throws IOException { 167 List<Job> jobs = new ArrayList<Job>(); 168 for (JobStatus stat : stats) { 169 jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile()))); 170 } 171 return jobs.toArray(new Job[0]); 172 } 173 174 /** 175 * Get the file system where job-specific files are stored 176 * 177 * @return object of FileSystem 178 * @throws IOException 179 * @throws InterruptedException 180 */ 181 public synchronized FileSystem getFileSystem() 182 throws IOException, InterruptedException { 183 if (this.fs == null) { 184 try { 185 this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 186 public FileSystem run() throws IOException, InterruptedException { 187 final Path sysDir = new Path(client.getSystemDir()); 188 return sysDir.getFileSystem(getConf()); 189 } 190 }); 191 } catch (InterruptedException e) { 192 throw new RuntimeException(e); 193 } 194 } 195 return fs; 196 } 197 198 /** 199 * Get job corresponding to jobid. 200 * 201 * @param jobId 202 * @return object of {@link Job} 203 * @throws IOException 204 * @throws InterruptedException 205 */ 206 public Job getJob(JobID jobId) throws IOException, InterruptedException { 207 JobStatus status = client.getJobStatus(jobId); 208 if (status != null) { 209 JobConf conf; 210 try { 211 conf = new JobConf(status.getJobFile()); 212 } catch (RuntimeException ex) { 213 // If job file doesn't exist it means we can't find the job 214 if (ex.getCause() instanceof FileNotFoundException) { 215 return null; 216 } else { 217 throw ex; 218 } 219 } 220 return Job.getInstance(this, status, conf); 221 } 222 return null; 223 } 224 225 /** 226 * Get all the queues in cluster. 227 * 228 * @return array of {@link QueueInfo} 229 * @throws IOException 230 * @throws InterruptedException 231 */ 232 public QueueInfo[] getQueues() throws IOException, InterruptedException { 233 return client.getQueues(); 234 } 235 236 /** 237 * Get queue information for the specified name. 238 * 239 * @param name queuename 240 * @return object of {@link QueueInfo} 241 * @throws IOException 242 * @throws InterruptedException 243 */ 244 public QueueInfo getQueue(String name) 245 throws IOException, InterruptedException { 246 return client.getQueue(name); 247 } 248 249 /** 250 * Get log parameters for the specified jobID or taskAttemptID 251 * @param jobID the job id. 252 * @param taskAttemptID the task attempt id. Optional. 253 * @return the LogParams 254 * @throws IOException 255 * @throws InterruptedException 256 */ 257 public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID) 258 throws IOException, InterruptedException { 259 return client.getLogFileParams(jobID, taskAttemptID); 260 } 261 262 /** 263 * Get current cluster status. 264 * 265 * @return object of {@link ClusterMetrics} 266 * @throws IOException 267 * @throws InterruptedException 268 */ 269 public ClusterMetrics getClusterStatus() throws IOException, InterruptedException { 270 return client.getClusterMetrics(); 271 } 272 273 /** 274 * Get all active trackers in the cluster. 275 * 276 * @return array of {@link TaskTrackerInfo} 277 * @throws IOException 278 * @throws InterruptedException 279 */ 280 public TaskTrackerInfo[] getActiveTaskTrackers() 281 throws IOException, InterruptedException { 282 return client.getActiveTrackers(); 283 } 284 285 /** 286 * Get blacklisted trackers. 287 * 288 * @return array of {@link TaskTrackerInfo} 289 * @throws IOException 290 * @throws InterruptedException 291 */ 292 public TaskTrackerInfo[] getBlackListedTaskTrackers() 293 throws IOException, InterruptedException { 294 return client.getBlacklistedTrackers(); 295 } 296 297 /** 298 * Get all the jobs in cluster. 299 * 300 * @return array of {@link Job} 301 * @throws IOException 302 * @throws InterruptedException 303 * @deprecated Use {@link #getAllJobStatuses()} instead. 304 */ 305 @Deprecated 306 public Job[] getAllJobs() throws IOException, InterruptedException { 307 return getJobs(client.getAllJobs()); 308 } 309 310 /** 311 * Get job status for all jobs in the cluster. 312 * @return job status for all jobs in cluster 313 * @throws IOException 314 * @throws InterruptedException 315 */ 316 public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException { 317 return client.getAllJobs(); 318 } 319 320 /** 321 * Grab the jobtracker system directory path where 322 * job-specific files will be placed. 323 * 324 * @return the system directory where job-specific files are to be placed. 325 */ 326 public Path getSystemDir() throws IOException, InterruptedException { 327 if (sysDir == null) { 328 sysDir = new Path(client.getSystemDir()); 329 } 330 return sysDir; 331 } 332 333 /** 334 * Grab the jobtracker's view of the staging directory path where 335 * job-specific files will be placed. 336 * 337 * @return the staging directory where job-specific files are to be placed. 338 */ 339 public Path getStagingAreaDir() throws IOException, InterruptedException { 340 if (stagingAreaDir == null) { 341 stagingAreaDir = new Path(client.getStagingAreaDir()); 342 } 343 return stagingAreaDir; 344 } 345 346 /** 347 * Get the job history file path for a given job id. The job history file at 348 * this path may or may not be existing depending on the job completion state. 349 * The file is present only for the completed jobs. 350 * @param jobId the JobID of the job submitted by the current user. 351 * @return the file path of the job history file 352 * @throws IOException 353 * @throws InterruptedException 354 */ 355 public String getJobHistoryUrl(JobID jobId) throws IOException, 356 InterruptedException { 357 if (jobHistoryDir == null) { 358 jobHistoryDir = new Path(client.getJobHistoryDir()); 359 } 360 return new Path(jobHistoryDir, jobId.toString() + "_" 361 + ugi.getShortUserName()).toString(); 362 } 363 364 /** 365 * Gets the Queue ACLs for current user 366 * @return array of QueueAclsInfo object for current user. 367 * @throws IOException 368 */ 369 public QueueAclsInfo[] getQueueAclsForCurrentUser() 370 throws IOException, InterruptedException { 371 return client.getQueueAclsForCurrentUser(); 372 } 373 374 /** 375 * Gets the root level queues. 376 * @return array of JobQueueInfo object. 377 * @throws IOException 378 */ 379 public QueueInfo[] getRootQueues() throws IOException, InterruptedException { 380 return client.getRootQueues(); 381 } 382 383 /** 384 * Returns immediate children of queueName. 385 * @param queueName 386 * @return array of JobQueueInfo which are children of queueName 387 * @throws IOException 388 */ 389 public QueueInfo[] getChildQueues(String queueName) 390 throws IOException, InterruptedException { 391 return client.getChildQueues(queueName); 392 } 393 394 /** 395 * Get the JobTracker's status. 396 * 397 * @return {@link JobTrackerStatus} of the JobTracker 398 * @throws IOException 399 * @throws InterruptedException 400 */ 401 public JobTrackerStatus getJobTrackerStatus() throws IOException, 402 InterruptedException { 403 return client.getJobTrackerStatus(); 404 } 405 406 /** 407 * Get the tasktracker expiry interval for the cluster 408 * @return the expiry interval in msec 409 */ 410 public long getTaskTrackerExpiryInterval() throws IOException, 411 InterruptedException { 412 return client.getTaskTrackerExpiryInterval(); 413 } 414 415 /** 416 * Get a delegation token for the user from the JobTracker. 417 * @param renewer the user who can renew the token 418 * @return the new token 419 * @throws IOException 420 */ 421 public Token<DelegationTokenIdentifier> 422 getDelegationToken(Text renewer) throws IOException, InterruptedException{ 423 // client has already set the service 424 return client.getDelegationToken(renewer); 425 } 426 427 /** 428 * Renew a delegation token 429 * @param token the token to renew 430 * @return the new expiration time 431 * @throws InvalidToken 432 * @throws IOException 433 * @deprecated Use {@link Token#renew} instead 434 */ 435 public long renewDelegationToken(Token<DelegationTokenIdentifier> token 436 ) throws InvalidToken, IOException, 437 InterruptedException { 438 return token.renew(getConf()); 439 } 440 441 /** 442 * Cancel a delegation token from the JobTracker 443 * @param token the token to cancel 444 * @throws IOException 445 * @deprecated Use {@link Token#cancel} instead 446 */ 447 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token 448 ) throws IOException, 449 InterruptedException { 450 token.cancel(getConf()); 451 } 452 453}