001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce;
020    
021    import java.io.FileNotFoundException;
022    import java.io.IOException;
023    import java.net.InetSocketAddress;
024    import java.security.PrivilegedExceptionAction;
025    import java.util.ArrayList;
026    import java.util.List;
027    import java.util.ServiceLoader;
028    
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    import org.apache.hadoop.classification.InterfaceAudience;
032    import org.apache.hadoop.classification.InterfaceStability;
033    import org.apache.hadoop.conf.Configuration;
034    import org.apache.hadoop.fs.FileSystem;
035    import org.apache.hadoop.fs.Path;
036    import org.apache.hadoop.io.Text;
037    import org.apache.hadoop.mapred.JobConf;
038    import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
039    import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
040    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
041    import org.apache.hadoop.mapreduce.util.ConfigUtil;
042    import org.apache.hadoop.mapreduce.v2.LogParams;
043    import org.apache.hadoop.security.UserGroupInformation;
044    import org.apache.hadoop.security.token.SecretManager.InvalidToken;
045    import org.apache.hadoop.security.token.Token;
046    
047    /**
048     * Provides a way to access information about the map/reduce cluster.
049     */
050    @InterfaceAudience.Public
051    @InterfaceStability.Evolving
052    public class Cluster {
053      
054      @InterfaceStability.Evolving
055      public static enum JobTrackerStatus {INITIALIZING, RUNNING};
056      
057      private ClientProtocolProvider clientProtocolProvider;
058      private ClientProtocol client;
059      private UserGroupInformation ugi;
060      private Configuration conf;
061      private FileSystem fs = null;
062      private Path sysDir = null;
063      private Path stagingAreaDir = null;
064      private Path jobHistoryDir = null;
065      private static final Log LOG = LogFactory.getLog(Cluster.class);
066    
067      private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
068          ServiceLoader.load(ClientProtocolProvider.class);
069      
070      static {
071        ConfigUtil.loadResources();
072      }
073      
074      public Cluster(Configuration conf) throws IOException {
075        this(null, conf);
076      }
077    
078      public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
079          throws IOException {
080        this.conf = conf;
081        this.ugi = UserGroupInformation.getCurrentUser();
082        initialize(jobTrackAddr, conf);
083      }
084      
085      private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
086          throws IOException {
087    
088        synchronized (frameworkLoader) {
089          for (ClientProtocolProvider provider : frameworkLoader) {
090            LOG.debug("Trying ClientProtocolProvider : "
091                + provider.getClass().getName());
092            ClientProtocol clientProtocol = null; 
093            try {
094              if (jobTrackAddr == null) {
095                clientProtocol = provider.create(conf);
096              } else {
097                clientProtocol = provider.create(jobTrackAddr, conf);
098              }
099    
100              if (clientProtocol != null) {
101                clientProtocolProvider = provider;
102                client = clientProtocol;
103                LOG.debug("Picked " + provider.getClass().getName()
104                    + " as the ClientProtocolProvider");
105                break;
106              }
107              else {
108                LOG.debug("Cannot pick " + provider.getClass().getName()
109                    + " as the ClientProtocolProvider - returned null protocol");
110              }
111            } 
112            catch (Exception e) {
113              LOG.info("Failed to use " + provider.getClass().getName()
114                  + " due to error: " + e.getMessage());
115            }
116          }
117        }
118    
119        if (null == clientProtocolProvider || null == client) {
120          throw new IOException(
121              "Cannot initialize Cluster. Please check your configuration for "
122                  + MRConfig.FRAMEWORK_NAME
123                  + " and the correspond server addresses.");
124        }
125      }
126    
127      ClientProtocol getClient() {
128        return client;
129      }
130      
131      Configuration getConf() {
132        return conf;
133      }
134      
135      /**
136       * Close the <code>Cluster</code>.
137       */
138      public synchronized void close() throws IOException {
139        clientProtocolProvider.close(client);
140      }
141    
142      private Job[] getJobs(JobStatus[] stats) throws IOException {
143        List<Job> jobs = new ArrayList<Job>();
144        for (JobStatus stat : stats) {
145          jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
146        }
147        return jobs.toArray(new Job[0]);
148      }
149    
150      /**
151       * Get the file system where job-specific files are stored
152       * 
153       * @return object of FileSystem
154       * @throws IOException
155       * @throws InterruptedException
156       */
157      public synchronized FileSystem getFileSystem() 
158          throws IOException, InterruptedException {
159        if (this.fs == null) {
160          try {
161            this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
162              public FileSystem run() throws IOException, InterruptedException {
163                final Path sysDir = new Path(client.getSystemDir());
164                return sysDir.getFileSystem(getConf());
165              }
166            });
167          } catch (InterruptedException e) {
168            throw new RuntimeException(e);
169          }
170        }
171        return fs;
172      }
173    
174      /**
175       * Get job corresponding to jobid.
176       * 
177       * @param jobId
178       * @return object of {@link Job}
179       * @throws IOException
180       * @throws InterruptedException
181       */
182      public Job getJob(JobID jobId) throws IOException, InterruptedException {
183        JobStatus status = client.getJobStatus(jobId);
184        if (status != null) {
185          JobConf conf;
186          try {
187            conf = new JobConf(status.getJobFile());
188          } catch (RuntimeException ex) {
189            // If job file doesn't exist it means we can't find the job
190            if (ex.getCause() instanceof FileNotFoundException) {
191              return null;
192            } else {
193              throw ex;
194            }
195          }
196          return Job.getInstance(this, status, conf);
197        }
198        return null;
199      }
200      
201      /**
202       * Get all the queues in cluster.
203       * 
204       * @return array of {@link QueueInfo}
205       * @throws IOException
206       * @throws InterruptedException
207       */
208      public QueueInfo[] getQueues() throws IOException, InterruptedException {
209        return client.getQueues();
210      }
211      
212      /**
213       * Get queue information for the specified name.
214       * 
215       * @param name queuename
216       * @return object of {@link QueueInfo}
217       * @throws IOException
218       * @throws InterruptedException
219       */
220      public QueueInfo getQueue(String name) 
221          throws IOException, InterruptedException {
222        return client.getQueue(name);
223      }
224    
225      /**
226       * Get log parameters for the specified jobID or taskAttemptID
227       * @param jobID the job id.
228       * @param taskAttemptID the task attempt id. Optional.
229       * @return the LogParams
230       * @throws IOException
231       * @throws InterruptedException
232       */
233      public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
234          throws IOException, InterruptedException {
235        return client.getLogFileParams(jobID, taskAttemptID);
236      }
237    
238      /**
239       * Get current cluster status.
240       * 
241       * @return object of {@link ClusterMetrics}
242       * @throws IOException
243       * @throws InterruptedException
244       */
245      public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
246        return client.getClusterMetrics();
247      }
248      
249      /**
250       * Get all active trackers in the cluster.
251       * 
252       * @return array of {@link TaskTrackerInfo}
253       * @throws IOException
254       * @throws InterruptedException
255       */
256      public TaskTrackerInfo[] getActiveTaskTrackers() 
257          throws IOException, InterruptedException  {
258        return client.getActiveTrackers();
259      }
260      
261      /**
262       * Get blacklisted trackers.
263       * 
264       * @return array of {@link TaskTrackerInfo}
265       * @throws IOException
266       * @throws InterruptedException
267       */
268      public TaskTrackerInfo[] getBlackListedTaskTrackers() 
269          throws IOException, InterruptedException  {
270        return client.getBlacklistedTrackers();
271      }
272      
273      /**
274       * Get all the jobs in cluster.
275       * 
276       * @return array of {@link Job}
277       * @throws IOException
278       * @throws InterruptedException
279       * @deprecated Use {@link #getAllJobStatuses()} instead.
280       */
281      @Deprecated
282      public Job[] getAllJobs() throws IOException, InterruptedException {
283        return getJobs(client.getAllJobs());
284      }
285    
286      /**
287       * Get job status for all jobs in the cluster.
288       * @return job status for all jobs in cluster
289       * @throws IOException
290       * @throws InterruptedException
291       */
292      public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
293        return client.getAllJobs();
294      }
295    
296      /**
297       * Grab the jobtracker system directory path where 
298       * job-specific files will  be placed.
299       * 
300       * @return the system directory where job-specific files are to be placed.
301       */
302      public Path getSystemDir() throws IOException, InterruptedException {
303        if (sysDir == null) {
304          sysDir = new Path(client.getSystemDir());
305        }
306        return sysDir;
307      }
308      
309      /**
310       * Grab the jobtracker's view of the staging directory path where 
311       * job-specific files will  be placed.
312       * 
313       * @return the staging directory where job-specific files are to be placed.
314       */
315      public Path getStagingAreaDir() throws IOException, InterruptedException {
316        if (stagingAreaDir == null) {
317          stagingAreaDir = new Path(client.getStagingAreaDir());
318        }
319        return stagingAreaDir;
320      }
321    
322      /**
323       * Get the job history file path for a given job id. The job history file at 
324       * this path may or may not be existing depending on the job completion state.
325       * The file is present only for the completed jobs.
326       * @param jobId the JobID of the job submitted by the current user.
327       * @return the file path of the job history file
328       * @throws IOException
329       * @throws InterruptedException
330       */
331      public String getJobHistoryUrl(JobID jobId) throws IOException, 
332        InterruptedException {
333        if (jobHistoryDir == null) {
334          jobHistoryDir = new Path(client.getJobHistoryDir());
335        }
336        return new Path(jobHistoryDir, jobId.toString() + "_"
337                        + ugi.getShortUserName()).toString();
338      }
339    
340      /**
341       * Gets the Queue ACLs for current user
342       * @return array of QueueAclsInfo object for current user.
343       * @throws IOException
344       */
345      public QueueAclsInfo[] getQueueAclsForCurrentUser() 
346          throws IOException, InterruptedException  {
347        return client.getQueueAclsForCurrentUser();
348      }
349    
350      /**
351       * Gets the root level queues.
352       * @return array of JobQueueInfo object.
353       * @throws IOException
354       */
355      public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
356        return client.getRootQueues();
357      }
358      
359      /**
360       * Returns immediate children of queueName.
361       * @param queueName
362       * @return array of JobQueueInfo which are children of queueName
363       * @throws IOException
364       */
365      public QueueInfo[] getChildQueues(String queueName) 
366          throws IOException, InterruptedException {
367        return client.getChildQueues(queueName);
368      }
369      
370      /**
371       * Get the JobTracker's status.
372       * 
373       * @return {@link JobTrackerStatus} of the JobTracker
374       * @throws IOException
375       * @throws InterruptedException
376       */
377      public JobTrackerStatus getJobTrackerStatus() throws IOException,
378          InterruptedException {
379        return client.getJobTrackerStatus();
380      }
381      
382      /**
383       * Get the tasktracker expiry interval for the cluster
384       * @return the expiry interval in msec
385       */
386      public long getTaskTrackerExpiryInterval() throws IOException,
387          InterruptedException {
388        return client.getTaskTrackerExpiryInterval();
389      }
390    
391      /**
392       * Get a delegation token for the user from the JobTracker.
393       * @param renewer the user who can renew the token
394       * @return the new token
395       * @throws IOException
396       */
397      public Token<DelegationTokenIdentifier> 
398          getDelegationToken(Text renewer) throws IOException, InterruptedException{
399        // client has already set the service
400        return client.getDelegationToken(renewer);
401      }
402    
403      /**
404       * Renew a delegation token
405       * @param token the token to renew
406       * @return the new expiration time
407       * @throws InvalidToken
408       * @throws IOException
409       * @deprecated Use {@link Token#renew} instead
410       */
411      public long renewDelegationToken(Token<DelegationTokenIdentifier> token
412                                       ) throws InvalidToken, IOException,
413                                                InterruptedException {
414        return token.renew(getConf());
415      }
416    
417      /**
418       * Cancel a delegation token from the JobTracker
419       * @param token the token to cancel
420       * @throws IOException
421       * @deprecated Use {@link Token#cancel} instead
422       */
423      public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
424                                        ) throws IOException,
425                                                 InterruptedException {
426        token.cancel(getConf());
427      }
428    
429    }