001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.ServiceLoader;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.apache.hadoop.classification.InterfaceAudience;
032import org.apache.hadoop.classification.InterfaceStability;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.io.Text;
037import org.apache.hadoop.mapred.JobConf;
038import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
039import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
040import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
041import org.apache.hadoop.mapreduce.util.ConfigUtil;
042import org.apache.hadoop.mapreduce.v2.LogParams;
043import org.apache.hadoop.security.UserGroupInformation;
044import org.apache.hadoop.security.token.SecretManager.InvalidToken;
045import org.apache.hadoop.security.token.Token;
046
047/**
048 * Provides a way to access information about the map/reduce cluster.
049 */
050@InterfaceAudience.Public
051@InterfaceStability.Evolving
052public class Cluster {
053  
054  @InterfaceStability.Evolving
055  public static enum JobTrackerStatus {INITIALIZING, RUNNING};
056  
057  private ClientProtocolProvider clientProtocolProvider;
058  private ClientProtocol client;
059  private UserGroupInformation ugi;
060  private Configuration conf;
061  private FileSystem fs = null;
062  private Path sysDir = null;
063  private Path stagingAreaDir = null;
064  private Path jobHistoryDir = null;
065  private static final Log LOG = LogFactory.getLog(Cluster.class);
066
067  private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
068      ServiceLoader.load(ClientProtocolProvider.class);
069  
070  static {
071    ConfigUtil.loadResources();
072  }
073  
074  public Cluster(Configuration conf) throws IOException {
075    this(null, conf);
076  }
077
078  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
079      throws IOException {
080    this.conf = conf;
081    this.ugi = UserGroupInformation.getCurrentUser();
082    initialize(jobTrackAddr, conf);
083  }
084  
085  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
086      throws IOException {
087
088    synchronized (frameworkLoader) {
089      for (ClientProtocolProvider provider : frameworkLoader) {
090        LOG.debug("Trying ClientProtocolProvider : "
091            + provider.getClass().getName());
092        ClientProtocol clientProtocol = null; 
093        try {
094          if (jobTrackAddr == null) {
095            clientProtocol = provider.create(conf);
096          } else {
097            clientProtocol = provider.create(jobTrackAddr, conf);
098          }
099
100          if (clientProtocol != null) {
101            clientProtocolProvider = provider;
102            client = clientProtocol;
103            LOG.debug("Picked " + provider.getClass().getName()
104                + " as the ClientProtocolProvider");
105            break;
106          }
107          else {
108            LOG.debug("Cannot pick " + provider.getClass().getName()
109                + " as the ClientProtocolProvider - returned null protocol");
110          }
111        } 
112        catch (Exception e) {
113          LOG.info("Failed to use " + provider.getClass().getName()
114              + " due to error: " + e.getMessage());
115        }
116      }
117    }
118
119    if (null == clientProtocolProvider || null == client) {
120      throw new IOException(
121          "Cannot initialize Cluster. Please check your configuration for "
122              + MRConfig.FRAMEWORK_NAME
123              + " and the correspond server addresses.");
124    }
125  }
126
127  ClientProtocol getClient() {
128    return client;
129  }
130  
131  Configuration getConf() {
132    return conf;
133  }
134  
135  /**
136   * Close the <code>Cluster</code>.
137   */
138  public synchronized void close() throws IOException {
139    clientProtocolProvider.close(client);
140  }
141
142  private Job[] getJobs(JobStatus[] stats) throws IOException {
143    List<Job> jobs = new ArrayList<Job>();
144    for (JobStatus stat : stats) {
145      jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
146    }
147    return jobs.toArray(new Job[0]);
148  }
149
150  /**
151   * Get the file system where job-specific files are stored
152   * 
153   * @return object of FileSystem
154   * @throws IOException
155   * @throws InterruptedException
156   */
157  public synchronized FileSystem getFileSystem() 
158      throws IOException, InterruptedException {
159    if (this.fs == null) {
160      try {
161        this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
162          public FileSystem run() throws IOException, InterruptedException {
163            final Path sysDir = new Path(client.getSystemDir());
164            return sysDir.getFileSystem(getConf());
165          }
166        });
167      } catch (InterruptedException e) {
168        throw new RuntimeException(e);
169      }
170    }
171    return fs;
172  }
173
174  /**
175   * Get job corresponding to jobid.
176   * 
177   * @param jobId
178   * @return object of {@link Job}
179   * @throws IOException
180   * @throws InterruptedException
181   */
182  public Job getJob(JobID jobId) throws IOException, InterruptedException {
183    JobStatus status = client.getJobStatus(jobId);
184    if (status != null) {
185      JobConf conf;
186      try {
187        conf = new JobConf(status.getJobFile());
188      } catch (RuntimeException ex) {
189        // If job file doesn't exist it means we can't find the job
190        if (ex.getCause() instanceof FileNotFoundException) {
191          return null;
192        } else {
193          throw ex;
194        }
195      }
196      return Job.getInstance(this, status, conf);
197    }
198    return null;
199  }
200  
201  /**
202   * Get all the queues in cluster.
203   * 
204   * @return array of {@link QueueInfo}
205   * @throws IOException
206   * @throws InterruptedException
207   */
208  public QueueInfo[] getQueues() throws IOException, InterruptedException {
209    return client.getQueues();
210  }
211  
212  /**
213   * Get queue information for the specified name.
214   * 
215   * @param name queuename
216   * @return object of {@link QueueInfo}
217   * @throws IOException
218   * @throws InterruptedException
219   */
220  public QueueInfo getQueue(String name) 
221      throws IOException, InterruptedException {
222    return client.getQueue(name);
223  }
224
225  /**
226   * Get log parameters for the specified jobID or taskAttemptID
227   * @param jobID the job id.
228   * @param taskAttemptID the task attempt id. Optional.
229   * @return the LogParams
230   * @throws IOException
231   * @throws InterruptedException
232   */
233  public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
234      throws IOException, InterruptedException {
235    return client.getLogFileParams(jobID, taskAttemptID);
236  }
237
238  /**
239   * Get current cluster status.
240   * 
241   * @return object of {@link ClusterMetrics}
242   * @throws IOException
243   * @throws InterruptedException
244   */
245  public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
246    return client.getClusterMetrics();
247  }
248  
249  /**
250   * Get all active trackers in the cluster.
251   * 
252   * @return array of {@link TaskTrackerInfo}
253   * @throws IOException
254   * @throws InterruptedException
255   */
256  public TaskTrackerInfo[] getActiveTaskTrackers() 
257      throws IOException, InterruptedException  {
258    return client.getActiveTrackers();
259  }
260  
261  /**
262   * Get blacklisted trackers.
263   * 
264   * @return array of {@link TaskTrackerInfo}
265   * @throws IOException
266   * @throws InterruptedException
267   */
268  public TaskTrackerInfo[] getBlackListedTaskTrackers() 
269      throws IOException, InterruptedException  {
270    return client.getBlacklistedTrackers();
271  }
272  
273  /**
274   * Get all the jobs in cluster.
275   * 
276   * @return array of {@link Job}
277   * @throws IOException
278   * @throws InterruptedException
279   * @deprecated Use {@link #getAllJobStatuses()} instead.
280   */
281  @Deprecated
282  public Job[] getAllJobs() throws IOException, InterruptedException {
283    return getJobs(client.getAllJobs());
284  }
285
286  /**
287   * Get job status for all jobs in the cluster.
288   * @return job status for all jobs in cluster
289   * @throws IOException
290   * @throws InterruptedException
291   */
292  public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
293    return client.getAllJobs();
294  }
295
296  /**
297   * Grab the jobtracker system directory path where 
298   * job-specific files will  be placed.
299   * 
300   * @return the system directory where job-specific files are to be placed.
301   */
302  public Path getSystemDir() throws IOException, InterruptedException {
303    if (sysDir == null) {
304      sysDir = new Path(client.getSystemDir());
305    }
306    return sysDir;
307  }
308  
309  /**
310   * Grab the jobtracker's view of the staging directory path where 
311   * job-specific files will  be placed.
312   * 
313   * @return the staging directory where job-specific files are to be placed.
314   */
315  public Path getStagingAreaDir() throws IOException, InterruptedException {
316    if (stagingAreaDir == null) {
317      stagingAreaDir = new Path(client.getStagingAreaDir());
318    }
319    return stagingAreaDir;
320  }
321
322  /**
323   * Get the job history file path for a given job id. The job history file at 
324   * this path may or may not be existing depending on the job completion state.
325   * The file is present only for the completed jobs.
326   * @param jobId the JobID of the job submitted by the current user.
327   * @return the file path of the job history file
328   * @throws IOException
329   * @throws InterruptedException
330   */
331  public String getJobHistoryUrl(JobID jobId) throws IOException, 
332    InterruptedException {
333    if (jobHistoryDir == null) {
334      jobHistoryDir = new Path(client.getJobHistoryDir());
335    }
336    return new Path(jobHistoryDir, jobId.toString() + "_"
337                    + ugi.getShortUserName()).toString();
338  }
339
340  /**
341   * Gets the Queue ACLs for current user
342   * @return array of QueueAclsInfo object for current user.
343   * @throws IOException
344   */
345  public QueueAclsInfo[] getQueueAclsForCurrentUser() 
346      throws IOException, InterruptedException  {
347    return client.getQueueAclsForCurrentUser();
348  }
349
350  /**
351   * Gets the root level queues.
352   * @return array of JobQueueInfo object.
353   * @throws IOException
354   */
355  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
356    return client.getRootQueues();
357  }
358  
359  /**
360   * Returns immediate children of queueName.
361   * @param queueName
362   * @return array of JobQueueInfo which are children of queueName
363   * @throws IOException
364   */
365  public QueueInfo[] getChildQueues(String queueName) 
366      throws IOException, InterruptedException {
367    return client.getChildQueues(queueName);
368  }
369  
370  /**
371   * Get the JobTracker's status.
372   * 
373   * @return {@link JobTrackerStatus} of the JobTracker
374   * @throws IOException
375   * @throws InterruptedException
376   */
377  public JobTrackerStatus getJobTrackerStatus() throws IOException,
378      InterruptedException {
379    return client.getJobTrackerStatus();
380  }
381  
382  /**
383   * Get the tasktracker expiry interval for the cluster
384   * @return the expiry interval in msec
385   */
386  public long getTaskTrackerExpiryInterval() throws IOException,
387      InterruptedException {
388    return client.getTaskTrackerExpiryInterval();
389  }
390
391  /**
392   * Get a delegation token for the user from the JobTracker.
393   * @param renewer the user who can renew the token
394   * @return the new token
395   * @throws IOException
396   */
397  public Token<DelegationTokenIdentifier> 
398      getDelegationToken(Text renewer) throws IOException, InterruptedException{
399    // client has already set the service
400    return client.getDelegationToken(renewer);
401  }
402
403  /**
404   * Renew a delegation token
405   * @param token the token to renew
406   * @return the new expiration time
407   * @throws InvalidToken
408   * @throws IOException
409   * @deprecated Use {@link Token#renew} instead
410   */
411  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
412                                   ) throws InvalidToken, IOException,
413                                            InterruptedException {
414    return token.renew(getConf());
415  }
416
417  /**
418   * Cancel a delegation token from the JobTracker
419   * @param token the token to cancel
420   * @throws IOException
421   * @deprecated Use {@link Token#cancel} instead
422   */
423  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
424                                    ) throws IOException,
425                                             InterruptedException {
426    token.cancel(getConf());
427  }
428
429}