001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.ServiceConfigurationError;
028import java.util.ServiceLoader;
029
030import com.google.common.annotations.VisibleForTesting;
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.apache.hadoop.classification.InterfaceAudience;
034import org.apache.hadoop.classification.InterfaceStability;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.io.Text;
039import org.apache.hadoop.mapred.JobConf;
040import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
041import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
042import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
043import org.apache.hadoop.mapreduce.util.ConfigUtil;
044import org.apache.hadoop.mapreduce.v2.LogParams;
045import org.apache.hadoop.security.UserGroupInformation;
046import org.apache.hadoop.security.token.SecretManager.InvalidToken;
047import org.apache.hadoop.security.token.Token;
048
049/**
050 * Provides a way to access information about the map/reduce cluster.
051 */
052@InterfaceAudience.Public
053@InterfaceStability.Evolving
054public class Cluster {
055  
056  @InterfaceStability.Evolving
057  public static enum JobTrackerStatus {INITIALIZING, RUNNING};
058
059  private ClientProtocolProvider clientProtocolProvider;
060  private ClientProtocol client;
061  private UserGroupInformation ugi;
062  private Configuration conf;
063  private FileSystem fs = null;
064  private Path sysDir = null;
065  private Path stagingAreaDir = null;
066  private Path jobHistoryDir = null;
067  private static final Log LOG = LogFactory.getLog(Cluster.class);
068
069  @VisibleForTesting
070  static Iterable<ClientProtocolProvider> frameworkLoader =
071      ServiceLoader.load(ClientProtocolProvider.class);
072  private volatile List<ClientProtocolProvider> providerList = null;
073
074  private void initProviderList() {
075    if (providerList == null) {
076      synchronized (frameworkLoader) {
077        if (providerList == null) {
078          List<ClientProtocolProvider> localProviderList =
079              new ArrayList<ClientProtocolProvider>();
080          try {
081            for (ClientProtocolProvider provider : frameworkLoader) {
082              localProviderList.add(provider);
083            }
084          } catch(ServiceConfigurationError e) {
085            LOG.info("Failed to instantiate ClientProtocolProvider, please "
086                         + "check the /META-INF/services/org.apache."
087                         + "hadoop.mapreduce.protocol.ClientProtocolProvider "
088                         + "files on the classpath", e);
089          }
090          providerList = localProviderList;
091        }
092      }
093    }
094  }
095
096  static {
097    ConfigUtil.loadResources();
098  }
099  
100  public Cluster(Configuration conf) throws IOException {
101    this(null, conf);
102  }
103
104  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
105      throws IOException {
106    this.conf = conf;
107    this.ugi = UserGroupInformation.getCurrentUser();
108    initialize(jobTrackAddr, conf);
109  }
110  
111  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
112      throws IOException {
113
114    initProviderList();
115    for (ClientProtocolProvider provider : providerList) {
116      LOG.debug("Trying ClientProtocolProvider : "
117          + provider.getClass().getName());
118      ClientProtocol clientProtocol = null;
119      try {
120        if (jobTrackAddr == null) {
121          clientProtocol = provider.create(conf);
122        } else {
123          clientProtocol = provider.create(jobTrackAddr, conf);
124        }
125
126        if (clientProtocol != null) {
127          clientProtocolProvider = provider;
128          client = clientProtocol;
129          LOG.debug("Picked " + provider.getClass().getName()
130              + " as the ClientProtocolProvider");
131          break;
132        } else {
133          LOG.debug("Cannot pick " + provider.getClass().getName()
134              + " as the ClientProtocolProvider - returned null protocol");
135        }
136      } catch (Exception e) {
137        LOG.info("Failed to use " + provider.getClass().getName()
138            + " due to error: ", e);
139      }
140    }
141
142    if (null == clientProtocolProvider || null == client) {
143      throw new IOException(
144          "Cannot initialize Cluster. Please check your configuration for "
145              + MRConfig.FRAMEWORK_NAME
146              + " and the correspond server addresses.");
147    }
148  }
149
150  ClientProtocol getClient() {
151    return client;
152  }
153  
154  Configuration getConf() {
155    return conf;
156  }
157  
158  /**
159   * Close the <code>Cluster</code>.
160   * @throws IOException
161   */
162  public synchronized void close() throws IOException {
163    clientProtocolProvider.close(client);
164  }
165
166  private Job[] getJobs(JobStatus[] stats) throws IOException {
167    List<Job> jobs = new ArrayList<Job>();
168    for (JobStatus stat : stats) {
169      jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
170    }
171    return jobs.toArray(new Job[0]);
172  }
173
174  /**
175   * Get the file system where job-specific files are stored
176   * 
177   * @return object of FileSystem
178   * @throws IOException
179   * @throws InterruptedException
180   */
181  public synchronized FileSystem getFileSystem() 
182      throws IOException, InterruptedException {
183    if (this.fs == null) {
184      try {
185        this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
186          public FileSystem run() throws IOException, InterruptedException {
187            final Path sysDir = new Path(client.getSystemDir());
188            return sysDir.getFileSystem(getConf());
189          }
190        });
191      } catch (InterruptedException e) {
192        throw new RuntimeException(e);
193      }
194    }
195    return fs;
196  }
197
198  /**
199   * Get job corresponding to jobid.
200   * 
201   * @param jobId
202   * @return object of {@link Job}
203   * @throws IOException
204   * @throws InterruptedException
205   */
206  public Job getJob(JobID jobId) throws IOException, InterruptedException {
207    JobStatus status = client.getJobStatus(jobId);
208    if (status != null) {
209      JobConf conf;
210      try {
211        conf = new JobConf(status.getJobFile());
212      } catch (RuntimeException ex) {
213        // If job file doesn't exist it means we can't find the job
214        if (ex.getCause() instanceof FileNotFoundException) {
215          return null;
216        } else {
217          throw ex;
218        }
219      }
220      return Job.getInstance(this, status, conf);
221    }
222    return null;
223  }
224  
225  /**
226   * Get all the queues in cluster.
227   * 
228   * @return array of {@link QueueInfo}
229   * @throws IOException
230   * @throws InterruptedException
231   */
232  public QueueInfo[] getQueues() throws IOException, InterruptedException {
233    return client.getQueues();
234  }
235  
236  /**
237   * Get queue information for the specified name.
238   * 
239   * @param name queuename
240   * @return object of {@link QueueInfo}
241   * @throws IOException
242   * @throws InterruptedException
243   */
244  public QueueInfo getQueue(String name) 
245      throws IOException, InterruptedException {
246    return client.getQueue(name);
247  }
248
249  /**
250   * Get log parameters for the specified jobID or taskAttemptID
251   * @param jobID the job id.
252   * @param taskAttemptID the task attempt id. Optional.
253   * @return the LogParams
254   * @throws IOException
255   * @throws InterruptedException
256   */
257  public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
258      throws IOException, InterruptedException {
259    return client.getLogFileParams(jobID, taskAttemptID);
260  }
261
262  /**
263   * Get current cluster status.
264   * 
265   * @return object of {@link ClusterMetrics}
266   * @throws IOException
267   * @throws InterruptedException
268   */
269  public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
270    return client.getClusterMetrics();
271  }
272  
273  /**
274   * Get all active trackers in the cluster.
275   * 
276   * @return array of {@link TaskTrackerInfo}
277   * @throws IOException
278   * @throws InterruptedException
279   */
280  public TaskTrackerInfo[] getActiveTaskTrackers() 
281      throws IOException, InterruptedException  {
282    return client.getActiveTrackers();
283  }
284  
285  /**
286   * Get blacklisted trackers.
287   * 
288   * @return array of {@link TaskTrackerInfo}
289   * @throws IOException
290   * @throws InterruptedException
291   */
292  public TaskTrackerInfo[] getBlackListedTaskTrackers() 
293      throws IOException, InterruptedException  {
294    return client.getBlacklistedTrackers();
295  }
296  
297  /**
298   * Get all the jobs in cluster.
299   * 
300   * @return array of {@link Job}
301   * @throws IOException
302   * @throws InterruptedException
303   * @deprecated Use {@link #getAllJobStatuses()} instead.
304   */
305  @Deprecated
306  public Job[] getAllJobs() throws IOException, InterruptedException {
307    return getJobs(client.getAllJobs());
308  }
309
310  /**
311   * Get job status for all jobs in the cluster.
312   * @return job status for all jobs in cluster
313   * @throws IOException
314   * @throws InterruptedException
315   */
316  public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
317    return client.getAllJobs();
318  }
319
320  /**
321   * Grab the jobtracker system directory path where 
322   * job-specific files will  be placed.
323   * 
324   * @return the system directory where job-specific files are to be placed.
325   */
326  public Path getSystemDir() throws IOException, InterruptedException {
327    if (sysDir == null) {
328      sysDir = new Path(client.getSystemDir());
329    }
330    return sysDir;
331  }
332  
333  /**
334   * Grab the jobtracker's view of the staging directory path where 
335   * job-specific files will  be placed.
336   * 
337   * @return the staging directory where job-specific files are to be placed.
338   */
339  public Path getStagingAreaDir() throws IOException, InterruptedException {
340    if (stagingAreaDir == null) {
341      stagingAreaDir = new Path(client.getStagingAreaDir());
342    }
343    return stagingAreaDir;
344  }
345
346  /**
347   * Get the job history file path for a given job id. The job history file at 
348   * this path may or may not be existing depending on the job completion state.
349   * The file is present only for the completed jobs.
350   * @param jobId the JobID of the job submitted by the current user.
351   * @return the file path of the job history file
352   * @throws IOException
353   * @throws InterruptedException
354   */
355  public String getJobHistoryUrl(JobID jobId) throws IOException, 
356    InterruptedException {
357    if (jobHistoryDir == null) {
358      jobHistoryDir = new Path(client.getJobHistoryDir());
359    }
360    return new Path(jobHistoryDir, jobId.toString() + "_"
361                    + ugi.getShortUserName()).toString();
362  }
363
364  /**
365   * Gets the Queue ACLs for current user
366   * @return array of QueueAclsInfo object for current user.
367   * @throws IOException
368   */
369  public QueueAclsInfo[] getQueueAclsForCurrentUser() 
370      throws IOException, InterruptedException  {
371    return client.getQueueAclsForCurrentUser();
372  }
373
374  /**
375   * Gets the root level queues.
376   * @return array of JobQueueInfo object.
377   * @throws IOException
378   */
379  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
380    return client.getRootQueues();
381  }
382  
383  /**
384   * Returns immediate children of queueName.
385   * @param queueName
386   * @return array of JobQueueInfo which are children of queueName
387   * @throws IOException
388   */
389  public QueueInfo[] getChildQueues(String queueName) 
390      throws IOException, InterruptedException {
391    return client.getChildQueues(queueName);
392  }
393  
394  /**
395   * Get the JobTracker's status.
396   * 
397   * @return {@link JobTrackerStatus} of the JobTracker
398   * @throws IOException
399   * @throws InterruptedException
400   */
401  public JobTrackerStatus getJobTrackerStatus() throws IOException,
402      InterruptedException {
403    return client.getJobTrackerStatus();
404  }
405  
406  /**
407   * Get the tasktracker expiry interval for the cluster
408   * @return the expiry interval in msec
409   */
410  public long getTaskTrackerExpiryInterval() throws IOException,
411      InterruptedException {
412    return client.getTaskTrackerExpiryInterval();
413  }
414
415  /**
416   * Get a delegation token for the user from the JobTracker.
417   * @param renewer the user who can renew the token
418   * @return the new token
419   * @throws IOException
420   */
421  public Token<DelegationTokenIdentifier> 
422      getDelegationToken(Text renewer) throws IOException, InterruptedException{
423    // client has already set the service
424    return client.getDelegationToken(renewer);
425  }
426
427  /**
428   * Renew a delegation token
429   * @param token the token to renew
430   * @return the new expiration time
431   * @throws InvalidToken
432   * @throws IOException
433   * @deprecated Use {@link Token#renew} instead
434   */
435  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
436                                   ) throws InvalidToken, IOException,
437                                            InterruptedException {
438    return token.renew(getConf());
439  }
440
441  /**
442   * Cancel a delegation token from the JobTracker
443   * @param token the token to cancel
444   * @throws IOException
445   * @deprecated Use {@link Token#cancel} instead
446   */
447  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
448                                    ) throws IOException,
449                                             InterruptedException {
450    token.cancel(getConf());
451  }
452
453}