001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapred;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.net.InetSocketAddress;
023import java.net.URL;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.List;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.Text;
035import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
036import org.apache.hadoop.mapreduce.Cluster;
037import org.apache.hadoop.mapreduce.ClusterMetrics;
038import org.apache.hadoop.mapreduce.Job;
039import org.apache.hadoop.mapreduce.QueueInfo;
040import org.apache.hadoop.mapreduce.TaskTrackerInfo;
041import org.apache.hadoop.mapreduce.TaskType;
042import org.apache.hadoop.mapreduce.filecache.DistributedCache;
043import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
044import org.apache.hadoop.mapreduce.tools.CLI;
045import org.apache.hadoop.mapreduce.util.ConfigUtil;
046import org.apache.hadoop.security.UserGroupInformation;
047import org.apache.hadoop.security.token.SecretManager.InvalidToken;
048import org.apache.hadoop.security.token.Token;
049import org.apache.hadoop.security.token.TokenRenewer;
050import org.apache.hadoop.util.Tool;
051import org.apache.hadoop.util.ToolRunner;
052
053/**
054 * <code>JobClient</code> is the primary interface for the user-job to interact
055 * with the cluster.
056 * 
057 * <code>JobClient</code> provides facilities to submit jobs, track their 
058 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
059 * status information etc.
060 * 
061 * <p>The job submission process involves:
062 * <ol>
063 *   <li>
064 *   Checking the input and output specifications of the job.
065 *   </li>
066 *   <li>
067 *   Computing the {@link InputSplit}s for the job.
068 *   </li>
069 *   <li>
070 *   Setup the requisite accounting information for the {@link DistributedCache} 
071 *   of the job, if necessary.
072 *   </li>
073 *   <li>
074 *   Copying the job's jar and configuration to the map-reduce system directory 
075 *   on the distributed file-system. 
076 *   </li>
077 *   <li>
078 *   Submitting the job to the cluster and optionally monitoring
079 *   it's status.
080 *   </li>
081 * </ol></p>
082 *  
083 * Normally the user creates the application, describes various facets of the
084 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 
085 * the job and monitor its progress.
086 * 
087 * <p>Here is an example on how to use <code>JobClient</code>:</p>
088 * <p><blockquote><pre>
089 *     // Create a new JobConf
090 *     JobConf job = new JobConf(new Configuration(), MyJob.class);
091 *     
092 *     // Specify various job-specific parameters     
093 *     job.setJobName("myjob");
094 *     
095 *     job.setInputPath(new Path("in"));
096 *     job.setOutputPath(new Path("out"));
097 *     
098 *     job.setMapperClass(MyJob.MyMapper.class);
099 *     job.setReducerClass(MyJob.MyReducer.class);
100 *
101 *     // Submit the job, then poll for progress until the job is complete
102 *     JobClient.runJob(job);
103 * </pre></blockquote></p>
104 * 
105 * <h4 id="JobControl">Job Control</h4>
106 * 
107 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 
108 * which cannot be done via a single map-reduce job. This is fairly easy since 
109 * the output of the job, typically, goes to distributed file-system and that 
110 * can be used as the input for the next job.</p>
111 * 
112 * <p>However, this also means that the onus on ensuring jobs are complete 
113 * (success/failure) lies squarely on the clients. In such situations the 
114 * various job-control options are:
115 * <ol>
116 *   <li>
117 *   {@link #runJob(JobConf)} : submits the job and returns only after 
118 *   the job has completed.
119 *   </li>
120 *   <li>
121 *   {@link #submitJob(JobConf)} : only submits the job, then poll the 
122 *   returned handle to the {@link RunningJob} to query status and make 
123 *   scheduling decisions.
124 *   </li>
125 *   <li>
126 *   {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
127 *   on job-completion, thus avoiding polling.
128 *   </li>
129 * </ol></p>
130 * 
131 * @see JobConf
132 * @see ClusterStatus
133 * @see Tool
134 * @see DistributedCache
135 */
136@InterfaceAudience.Public
137@InterfaceStability.Stable
138public class JobClient extends CLI {
139  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
140  private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
141  /* notes that get delegation token was called. Again this is hack for oozie 
142   * to make sure we add history server delegation tokens to the credentials
143   *  for the job. Since the api only allows one delegation token to be returned, 
144   *  we have to add this hack.
145   */
146  private boolean getDelegationTokenCalled = false;
147  /* do we need a HS delegation token for this client */
148  static final String HS_DELEGATION_TOKEN_REQUIRED 
149      = "mapreduce.history.server.delegationtoken.required";
150  
151  static{
152    ConfigUtil.loadResources();
153  }
154
155  /**
156   * A NetworkedJob is an implementation of RunningJob.  It holds
157   * a JobProfile object to provide some info, and interacts with the
158   * remote service to provide certain functionality.
159   */
160  static class NetworkedJob implements RunningJob {
161    Job job;
162    /**
163     * We store a JobProfile and a timestamp for when we last
164     * acquired the job profile.  If the job is null, then we cannot
165     * perform any of the tasks.  The job might be null if the cluster
166     * has completely forgotten about the job.  (eg, 24 hours after the
167     * job completes.)
168     */
169    public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
170      job = Job.getInstance(cluster, status, new JobConf(status.getJobFile()));
171    }
172
173    public NetworkedJob(Job job) throws IOException {
174      this.job = job;
175    }
176
177    public Configuration getConfiguration() {
178      return job.getConfiguration();
179    }
180
181    /**
182     * An identifier for the job
183     */
184    public JobID getID() {
185      return JobID.downgrade(job.getJobID());
186    }
187    
188    /** @deprecated This method is deprecated and will be removed. Applications should 
189     * rather use {@link #getID()}.*/
190    @Deprecated
191    public String getJobID() {
192      return getID().toString();
193    }
194    
195    /**
196     * The user-specified job name
197     */
198    public String getJobName() {
199      return job.getJobName();
200    }
201
202    /**
203     * The name of the job file
204     */
205    public String getJobFile() {
206      return job.getJobFile();
207    }
208
209    /**
210     * A URL where the job's status can be seen
211     */
212    public String getTrackingURL() {
213      return job.getTrackingURL();
214    }
215
216    /**
217     * A float between 0.0 and 1.0, indicating the % of map work
218     * completed.
219     */
220    public float mapProgress() throws IOException {
221      try {
222        return job.mapProgress();
223      } catch (InterruptedException ie) {
224        throw new IOException(ie);
225      }
226    }
227
228    /**
229     * A float between 0.0 and 1.0, indicating the % of reduce work
230     * completed.
231     */
232    public float reduceProgress() throws IOException {
233      try {
234        return job.reduceProgress();
235      } catch (InterruptedException ie) {
236        throw new IOException(ie);
237      }
238    }
239
240    /**
241     * A float between 0.0 and 1.0, indicating the % of cleanup work
242     * completed.
243     */
244    public float cleanupProgress() throws IOException {
245      try {
246        return job.cleanupProgress();
247      } catch (InterruptedException ie) {
248        throw new IOException(ie);
249      }
250    }
251
252    /**
253     * A float between 0.0 and 1.0, indicating the % of setup work
254     * completed.
255     */
256    public float setupProgress() throws IOException {
257      try {
258        return job.setupProgress();
259      } catch (InterruptedException ie) {
260        throw new IOException(ie);
261      }
262    }
263
264    /**
265     * Returns immediately whether the whole job is done yet or not.
266     */
267    public synchronized boolean isComplete() throws IOException {
268      try {
269        return job.isComplete();
270      } catch (InterruptedException ie) {
271        throw new IOException(ie);
272      }
273    }
274
275    /**
276     * True iff job completed successfully.
277     */
278    public synchronized boolean isSuccessful() throws IOException {
279      try {
280        return job.isSuccessful();
281      } catch (InterruptedException ie) {
282        throw new IOException(ie);
283      }
284    }
285
286    /**
287     * Blocks until the job is finished
288     */
289    public void waitForCompletion() throws IOException {
290      try {
291        job.waitForCompletion(false);
292      } catch (InterruptedException ie) {
293        throw new IOException(ie);
294      } catch (ClassNotFoundException ce) {
295        throw new IOException(ce);
296      }
297    }
298
299    /**
300     * Tells the service to get the state of the current job.
301     */
302    public synchronized int getJobState() throws IOException {
303      try {
304        return job.getJobState().getValue();
305      } catch (InterruptedException ie) {
306        throw new IOException(ie);
307      }
308    }
309    
310    /**
311     * Tells the service to terminate the current job.
312     */
313    public synchronized void killJob() throws IOException {
314      try {
315        job.killJob();
316      } catch (InterruptedException ie) {
317        throw new IOException(ie);
318      }
319    }
320   
321    
322    /** Set the priority of the job.
323    * @param priority new priority of the job. 
324    */
325    public synchronized void setJobPriority(String priority) 
326                                                throws IOException {
327      try {
328        job.setPriority(
329          org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
330      } catch (InterruptedException ie) {
331        throw new IOException(ie);
332      }
333    }
334    
335    /**
336     * Kill indicated task attempt.
337     * @param taskId the id of the task to kill.
338     * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
339     * it is just killed, w/o affecting job failure status.
340     */
341    public synchronized void killTask(TaskAttemptID taskId,
342        boolean shouldFail) throws IOException {
343      try {
344        if (shouldFail) {
345          job.failTask(taskId);
346        } else {
347          job.killTask(taskId);
348        }
349      } catch (InterruptedException ie) {
350        throw new IOException(ie);
351      }
352    }
353
354    /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
355    @Deprecated
356    public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
357      killTask(TaskAttemptID.forName(taskId), shouldFail);
358    }
359    
360    /**
361     * Fetch task completion events from cluster for this job. 
362     */
363    public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
364        int startFrom) throws IOException {
365      try {
366        org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 
367          job.getTaskCompletionEvents(startFrom, 10);
368        TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
369        for (int i = 0 ; i < acls.length; i++ ) {
370          ret[i] = TaskCompletionEvent.downgrade(acls[i]);
371        }
372        return ret;
373      } catch (InterruptedException ie) {
374        throw new IOException(ie);
375      }
376    }
377
378    /**
379     * Dump stats to screen
380     */
381    @Override
382    public String toString() {
383      return job.toString();
384    }
385        
386    /**
387     * Returns the counters for this job
388     */
389    public Counters getCounters() throws IOException {
390      try { 
391        Counters result = null;
392        org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
393        if(temp != null) {
394          result = Counters.downgrade(temp);
395        }
396        return result;
397      } catch (InterruptedException ie) {
398        throw new IOException(ie);
399      }
400    }
401    
402    @Override
403    public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
404      try { 
405        return job.getTaskDiagnostics(id);
406      } catch (InterruptedException ie) {
407        throw new IOException(ie);
408      }
409    }
410
411    public String getHistoryUrl() throws IOException {
412      try {
413        return job.getHistoryUrl();
414      } catch (InterruptedException ie) {
415        throw new IOException(ie);
416      }
417    }
418
419    public boolean isRetired() throws IOException {
420      try {
421        return job.isRetired();
422      } catch (InterruptedException ie) {
423        throw new IOException(ie);
424      }
425    }
426    
427    boolean monitorAndPrintJob() throws IOException, InterruptedException {
428      return job.monitorAndPrintJob();
429    }
430    
431    @Override
432    public String getFailureInfo() throws IOException {
433      try {
434        return job.getStatus().getFailureInfo();
435      } catch (InterruptedException ie) {
436        throw new IOException(ie);
437      }
438    }
439
440  }
441
442  /**
443   * Ugi of the client. We store this ugi when the client is created and 
444   * then make sure that the same ugi is used to run the various protocols.
445   */
446  UserGroupInformation clientUgi;
447  
448  /**
449   * Create a job client.
450   */
451  public JobClient() {
452  }
453    
454  /**
455   * Build a job client with the given {@link JobConf}, and connect to the 
456   * default cluster
457   * 
458   * @param conf the job configuration.
459   * @throws IOException
460   */
461  public JobClient(JobConf conf) throws IOException {
462    init(conf);
463  }
464
465  /**
466   * Build a job client with the given {@link Configuration}, 
467   * and connect to the default cluster
468   * 
469   * @param conf the configuration.
470   * @throws IOException
471   */
472  public JobClient(Configuration conf) throws IOException {
473    init(new JobConf(conf));
474  }
475
476  /**
477   * Connect to the default cluster
478   * @param conf the job configuration.
479   * @throws IOException
480   */
481  public void init(JobConf conf) throws IOException {
482    setConf(conf);
483    cluster = new Cluster(conf);
484    clientUgi = UserGroupInformation.getCurrentUser();
485  }
486
487  /**
488   * Build a job client, connect to the indicated job tracker.
489   * 
490   * @param jobTrackAddr the job tracker to connect to.
491   * @param conf configuration.
492   */
493  public JobClient(InetSocketAddress jobTrackAddr, 
494                   Configuration conf) throws IOException {
495    cluster = new Cluster(jobTrackAddr, conf);
496    clientUgi = UserGroupInformation.getCurrentUser();
497  }
498
499  /**
500   * Close the <code>JobClient</code>.
501   */
502  public synchronized void close() throws IOException {
503    cluster.close();
504  }
505
506  /**
507   * Get a filesystem handle.  We need this to prepare jobs
508   * for submission to the MapReduce system.
509   * 
510   * @return the filesystem handle.
511   */
512  public synchronized FileSystem getFs() throws IOException {
513    try { 
514      return cluster.getFileSystem();
515    } catch (InterruptedException ie) {
516      throw new IOException(ie);
517    }
518  }
519  
520  /**
521   * Get a handle to the Cluster
522   */
523  public Cluster getClusterHandle() {
524    return cluster;
525  }
526  
527  /**
528   * Submit a job to the MR system.
529   * 
530   * This returns a handle to the {@link RunningJob} which can be used to track
531   * the running-job.
532   * 
533   * @param jobFile the job configuration.
534   * @return a handle to the {@link RunningJob} which can be used to track the
535   *         running-job.
536   * @throws FileNotFoundException
537   * @throws InvalidJobConfException
538   * @throws IOException
539   */
540  public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
541                                                     InvalidJobConfException, 
542                                                     IOException {
543    // Load in the submitted job details
544    JobConf job = new JobConf(jobFile);
545    return submitJob(job);
546  }
547    
548  /**
549   * Submit a job to the MR system.
550   * This returns a handle to the {@link RunningJob} which can be used to track
551   * the running-job.
552   * 
553   * @param conf the job configuration.
554   * @return a handle to the {@link RunningJob} which can be used to track the
555   *         running-job.
556   * @throws FileNotFoundException
557   * @throws IOException
558   */
559  public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
560                                                  IOException {
561    try {
562      conf.setBooleanIfUnset("mapred.mapper.new-api", false);
563      conf.setBooleanIfUnset("mapred.reducer.new-api", false);
564      if (getDelegationTokenCalled) {
565        conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
566        getDelegationTokenCalled = false;
567      }
568      Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
569        @Override
570        public Job run() throws IOException, ClassNotFoundException, 
571          InterruptedException {
572          Job job = Job.getInstance(conf);
573          job.submit();
574          return job;
575        }
576      });
577      // update our Cluster instance with the one created by Job for submission
578      // (we can't pass our Cluster instance to Job, since Job wraps the config
579      // instance, and the two configs would then diverge)
580      cluster = job.getCluster();
581      return new NetworkedJob(job);
582    } catch (InterruptedException ie) {
583      throw new IOException("interrupted", ie);
584    }
585  }
586
587  private Job getJobUsingCluster(final JobID jobid) throws IOException,
588  InterruptedException {
589    return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
590      public Job run() throws IOException, InterruptedException  {
591       return cluster.getJob(jobid);
592      }
593    });
594  }
595  /**
596   * Get an {@link RunningJob} object to track an ongoing job.  Returns
597   * null if the id does not correspond to any known job.
598   * 
599   * @param jobid the jobid of the job.
600   * @return the {@link RunningJob} handle to track the job, null if the 
601   *         <code>jobid</code> doesn't correspond to any known job.
602   * @throws IOException
603   */
604  public RunningJob getJob(final JobID jobid) throws IOException {
605    try {
606      
607      Job job = getJobUsingCluster(jobid);
608      if (job != null) {
609        JobStatus status = JobStatus.downgrade(job.getStatus());
610        if (status != null) {
611          return new NetworkedJob(status, cluster);
612        } 
613      }
614    } catch (InterruptedException ie) {
615      throw new IOException(ie);
616    }
617    return null;
618  }
619
620  /**@deprecated Applications should rather use {@link #getJob(JobID)}. 
621   */
622  @Deprecated
623  public RunningJob getJob(String jobid) throws IOException {
624    return getJob(JobID.forName(jobid));
625  }
626  
627  private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
628  
629  /**
630   * Get the information of the current state of the map tasks of a job.
631   * 
632   * @param jobId the job to query.
633   * @return the list of all of the map tips.
634   * @throws IOException
635   */
636  public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
637    return getTaskReports(jobId, TaskType.MAP);
638  }
639  
640  private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 
641    IOException {
642    try {
643      Job j = getJobUsingCluster(jobId);
644      if(j == null) {
645        return EMPTY_TASK_REPORTS;
646      }
647      return TaskReport.downgradeArray(j.getTaskReports(type));
648    } catch (InterruptedException ie) {
649      throw new IOException(ie);
650    }
651  }
652  
653  /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
654  @Deprecated
655  public TaskReport[] getMapTaskReports(String jobId) throws IOException {
656    return getMapTaskReports(JobID.forName(jobId));
657  }
658  
659  /**
660   * Get the information of the current state of the reduce tasks of a job.
661   * 
662   * @param jobId the job to query.
663   * @return the list of all of the reduce tips.
664   * @throws IOException
665   */    
666  public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
667    return getTaskReports(jobId, TaskType.REDUCE);
668  }
669
670  /**
671   * Get the information of the current state of the cleanup tasks of a job.
672   * 
673   * @param jobId the job to query.
674   * @return the list of all of the cleanup tips.
675   * @throws IOException
676   */    
677  public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
678    return getTaskReports(jobId, TaskType.JOB_CLEANUP);
679  }
680
681  /**
682   * Get the information of the current state of the setup tasks of a job.
683   * 
684   * @param jobId the job to query.
685   * @return the list of all of the setup tips.
686   * @throws IOException
687   */    
688  public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
689    return getTaskReports(jobId, TaskType.JOB_SETUP);
690  }
691
692  
693  /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
694  @Deprecated
695  public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
696    return getReduceTaskReports(JobID.forName(jobId));
697  }
698  
699  /**
700   * Display the information about a job's tasks, of a particular type and
701   * in a particular state
702   * 
703   * @param jobId the ID of the job
704   * @param type the type of the task (map/reduce/setup/cleanup)
705   * @param state the state of the task 
706   * (pending/running/completed/failed/killed)
707   */
708  public void displayTasks(final JobID jobId, String type, String state) 
709  throws IOException {
710    try {
711      Job job = getJobUsingCluster(jobId);
712      super.displayTasks(job, type, state);
713    } catch (InterruptedException ie) {
714      throw new IOException(ie);
715    }
716  }
717  
718  /**
719   * Get status information about the Map-Reduce cluster.
720   *  
721   * @return the status information about the Map-Reduce cluster as an object
722   *         of {@link ClusterStatus}.
723   * @throws IOException
724   */
725  public ClusterStatus getClusterStatus() throws IOException {
726    try {
727      return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
728        public ClusterStatus run()  throws IOException, InterruptedException {
729          ClusterMetrics metrics = cluster.getClusterStatus();
730          return new ClusterStatus(metrics.getTaskTrackerCount(),
731              metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(),
732              metrics.getOccupiedMapSlots(),
733              metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
734              metrics.getReduceSlotCapacity(),
735              cluster.getJobTrackerStatus(),
736              metrics.getDecommissionedTaskTrackerCount());
737        }
738      });
739    }
740      catch (InterruptedException ie) {
741      throw new IOException(ie);
742    }
743  }
744
745  private  Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
746    Collection<String> list = new ArrayList<String>();
747    for (TaskTrackerInfo info: objs) {
748      list.add(info.getTaskTrackerName());
749    }
750    return list;
751  }
752
753  private  Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
754    Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
755    for (TaskTrackerInfo info: objs) {
756      BlackListInfo binfo = new BlackListInfo();
757      binfo.setTrackerName(info.getTaskTrackerName());
758      binfo.setReasonForBlackListing(info.getReasonForBlacklist());
759      binfo.setBlackListReport(info.getBlacklistReport());
760      list.add(binfo);
761    }
762    return list;
763  }
764
765  /**
766   * Get status information about the Map-Reduce cluster.
767   *  
768   * @param  detailed if true then get a detailed status including the
769   *         tracker names
770   * @return the status information about the Map-Reduce cluster as an object
771   *         of {@link ClusterStatus}.
772   * @throws IOException
773   */
774  public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
775    try {
776      return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
777        public ClusterStatus run() throws IOException, InterruptedException {
778        ClusterMetrics metrics = cluster.getClusterStatus();
779        return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
780          arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
781          cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
782          metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
783          metrics.getReduceSlotCapacity(), 
784          cluster.getJobTrackerStatus());
785        }
786      });
787    } catch (InterruptedException ie) {
788      throw new IOException(ie);
789    }
790  }
791    
792
793  /** 
794   * Get the jobs that are not completed and not failed.
795   * 
796   * @return array of {@link JobStatus} for the running/to-be-run jobs.
797   * @throws IOException
798   */
799  public JobStatus[] jobsToComplete() throws IOException {
800    List<JobStatus> stats = new ArrayList<JobStatus>();
801    for (JobStatus stat : getAllJobs()) {
802      if (!stat.isJobComplete()) {
803        stats.add(stat);
804      }
805    }
806    return stats.toArray(new JobStatus[0]);
807  }
808
809  /** 
810   * Get the jobs that are submitted.
811   * 
812   * @return array of {@link JobStatus} for the submitted jobs.
813   * @throws IOException
814   */
815  public JobStatus[] getAllJobs() throws IOException {
816    try {
817      org.apache.hadoop.mapreduce.JobStatus[] jobs = 
818          clientUgi.doAs(new PrivilegedExceptionAction<
819              org.apache.hadoop.mapreduce.JobStatus[]> () {
820            public org.apache.hadoop.mapreduce.JobStatus[] run() 
821                throws IOException, InterruptedException {
822              return cluster.getAllJobStatuses();
823            }
824          });
825      JobStatus[] stats = new JobStatus[jobs.length];
826      for (int i = 0; i < jobs.length; i++) {
827        stats[i] = JobStatus.downgrade(jobs[i]);
828      }
829      return stats;
830    } catch (InterruptedException ie) {
831      throw new IOException(ie);
832    }
833  }
834  
835  /** 
836   * Utility that submits a job, then polls for progress until the job is
837   * complete.
838   * 
839   * @param job the job configuration.
840   * @throws IOException if the job fails
841   */
842  public static RunningJob runJob(JobConf job) throws IOException {
843    JobClient jc = new JobClient(job);
844    RunningJob rj = jc.submitJob(job);
845    try {
846      if (!jc.monitorAndPrintJob(job, rj)) {
847        throw new IOException("Job failed!");
848      }
849    } catch (InterruptedException ie) {
850      Thread.currentThread().interrupt();
851    }
852    return rj;
853  }
854  
855  /**
856   * Monitor a job and print status in real-time as progress is made and tasks 
857   * fail.
858   * @param conf the job's configuration
859   * @param job the job to track
860   * @return true if the job succeeded
861   * @throws IOException if communication to the JobTracker fails
862   */
863  public boolean monitorAndPrintJob(JobConf conf, 
864                                    RunningJob job
865  ) throws IOException, InterruptedException {
866    return ((NetworkedJob)job).monitorAndPrintJob();
867  }
868
869  static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
870    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
871  }
872  
873  static Configuration getConfiguration(String jobTrackerSpec)
874  {
875    Configuration conf = new Configuration();
876    if (jobTrackerSpec != null) {        
877      if (jobTrackerSpec.indexOf(":") >= 0) {
878        conf.set("mapred.job.tracker", jobTrackerSpec);
879      } else {
880        String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
881        URL validate = conf.getResource(classpathFile);
882        if (validate == null) {
883          throw new RuntimeException(classpathFile + " not found on CLASSPATH");
884        }
885        conf.addResource(classpathFile);
886      }
887    }
888    return conf;
889  }
890
891  /**
892   * Sets the output filter for tasks. only those tasks are printed whose
893   * output matches the filter. 
894   * @param newValue task filter.
895   */
896  @Deprecated
897  public void setTaskOutputFilter(TaskStatusFilter newValue){
898    this.taskOutputFilter = newValue;
899  }
900    
901  /**
902   * Get the task output filter out of the JobConf.
903   * 
904   * @param job the JobConf to examine.
905   * @return the filter level.
906   */
907  public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
908    return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
909                                            "FAILED"));
910  }
911    
912  /**
913   * Modify the JobConf to set the task output filter.
914   * 
915   * @param job the JobConf to modify.
916   * @param newValue the value to set.
917   */
918  public static void setTaskOutputFilter(JobConf job, 
919                                         TaskStatusFilter newValue) {
920    job.set("jobclient.output.filter", newValue.toString());
921  }
922    
923  /**
924   * Returns task output filter.
925   * @return task filter. 
926   */
927  @Deprecated
928  public TaskStatusFilter getTaskOutputFilter(){
929    return this.taskOutputFilter; 
930  }
931
932  protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
933      String counterGroupName, String counterName) throws IOException {
934    Counters counters = Counters.downgrade(cntrs);
935    return counters.findCounter(counterGroupName, counterName).getValue();
936  }
937
938  /**
939   * Get status information about the max available Maps in the cluster.
940   *  
941   * @return the max available Maps in the cluster
942   * @throws IOException
943   */
944  public int getDefaultMaps() throws IOException {
945    try {
946      return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
947        @Override
948        public Integer run() throws IOException, InterruptedException {
949          return cluster.getClusterStatus().getMapSlotCapacity();
950        }
951      });
952    } catch (InterruptedException ie) {
953      throw new IOException(ie);
954    }
955  }
956
957  /**
958   * Get status information about the max available Reduces in the cluster.
959   *  
960   * @return the max available Reduces in the cluster
961   * @throws IOException
962   */
963  public int getDefaultReduces() throws IOException {
964    try {
965      return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
966        @Override
967        public Integer run() throws IOException, InterruptedException {
968          return cluster.getClusterStatus().getReduceSlotCapacity();
969        }
970      });
971    } catch (InterruptedException ie) {
972      throw new IOException(ie);
973    }
974  }
975
976  /**
977   * Grab the jobtracker system directory path where job-specific files are to be placed.
978   * 
979   * @return the system directory where job-specific files are to be placed.
980   */
981  public Path getSystemDir() {
982    try {
983      return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
984        @Override
985        public Path run() throws IOException, InterruptedException {
986          return cluster.getSystemDir();
987        }
988      });
989      } catch (IOException ioe) {
990      return null;
991    } catch (InterruptedException ie) {
992      return null;
993    }
994  }
995
996  private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
997    JobQueueInfo ret = new JobQueueInfo(queue);
998    // make sure to convert any children
999    if (queue.getQueueChildren().size() > 0) {
1000      List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
1001          .getQueueChildren().size());
1002      for (QueueInfo child : queue.getQueueChildren()) {
1003        childQueues.add(getJobQueueInfo(child));
1004      }
1005      ret.setChildren(childQueues);
1006    }
1007    return ret;
1008  }
1009
1010  private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
1011      throws IOException {
1012    JobQueueInfo[] ret = new JobQueueInfo[queues.length];
1013    for (int i = 0; i < queues.length; i++) {
1014      ret[i] = getJobQueueInfo(queues[i]);
1015    }
1016    return ret;
1017  }
1018
1019  /**
1020   * Returns an array of queue information objects about root level queues
1021   * configured
1022   *
1023   * @return the array of root level JobQueueInfo objects
1024   * @throws IOException
1025   */
1026  public JobQueueInfo[] getRootQueues() throws IOException {
1027    try {
1028      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1029        public JobQueueInfo[] run() throws IOException, InterruptedException {
1030          return getJobQueueInfoArray(cluster.getRootQueues());
1031        }
1032      });
1033    } catch (InterruptedException ie) {
1034      throw new IOException(ie);
1035    }
1036  }
1037
1038  /**
1039   * Returns an array of queue information objects about immediate children
1040   * of queue queueName.
1041   * 
1042   * @param queueName
1043   * @return the array of immediate children JobQueueInfo objects
1044   * @throws IOException
1045   */
1046  public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
1047    try {
1048      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1049        public JobQueueInfo[] run() throws IOException, InterruptedException {
1050          return getJobQueueInfoArray(cluster.getChildQueues(queueName));
1051        }
1052      });
1053    } catch (InterruptedException ie) {
1054      throw new IOException(ie);
1055    }
1056  }
1057  
1058  /**
1059   * Return an array of queue information objects about all the Job Queues
1060   * configured.
1061   * 
1062   * @return Array of JobQueueInfo objects
1063   * @throws IOException
1064   */
1065  public JobQueueInfo[] getQueues() throws IOException {
1066    try {
1067      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1068        public JobQueueInfo[] run() throws IOException, InterruptedException {
1069          return getJobQueueInfoArray(cluster.getQueues());
1070        }
1071      });
1072    } catch (InterruptedException ie) {
1073      throw new IOException(ie);
1074    }
1075  }
1076  
1077  /**
1078   * Gets all the jobs which were added to particular Job Queue
1079   * 
1080   * @param queueName name of the Job Queue
1081   * @return Array of jobs present in the job queue
1082   * @throws IOException
1083   */
1084  
1085  public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
1086    try {
1087      QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
1088        @Override
1089        public QueueInfo run() throws IOException, InterruptedException {
1090          return cluster.getQueue(queueName);
1091        }
1092      });
1093      if (queue == null) {
1094        return null;
1095      }
1096      org.apache.hadoop.mapreduce.JobStatus[] stats = 
1097        queue.getJobStatuses();
1098      JobStatus[] ret = new JobStatus[stats.length];
1099      for (int i = 0 ; i < stats.length; i++ ) {
1100        ret[i] = JobStatus.downgrade(stats[i]);
1101      }
1102      return ret;
1103    } catch (InterruptedException ie) {
1104      throw new IOException(ie);
1105    }
1106  }
1107  
1108  /**
1109   * Gets the queue information associated to a particular Job Queue
1110   * 
1111   * @param queueName name of the job queue.
1112   * @return Queue information associated to particular queue.
1113   * @throws IOException
1114   */
1115  public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
1116    try {
1117      QueueInfo queueInfo = clientUgi.doAs(new 
1118          PrivilegedExceptionAction<QueueInfo>() {
1119        public QueueInfo run() throws IOException, InterruptedException {
1120          return cluster.getQueue(queueName);
1121        }
1122      });
1123      if (queueInfo != null) {
1124        return new JobQueueInfo(queueInfo);
1125      }
1126      return null;
1127    } catch (InterruptedException ie) {
1128      throw new IOException(ie);
1129    }
1130  }
1131  
1132  /**
1133   * Gets the Queue ACLs for current user
1134   * @return array of QueueAclsInfo object for current user.
1135   * @throws IOException
1136   */
1137  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
1138    try {
1139      org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 
1140        clientUgi.doAs(new 
1141            PrivilegedExceptionAction
1142            <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
1143              public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 
1144              throws IOException, InterruptedException {
1145                return cluster.getQueueAclsForCurrentUser();
1146              }
1147        });
1148      QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
1149      for (int i = 0 ; i < acls.length; i++ ) {
1150        ret[i] = QueueAclsInfo.downgrade(acls[i]);
1151      }
1152      return ret;
1153    } catch (InterruptedException ie) {
1154      throw new IOException(ie);
1155    }
1156  }
1157
1158  /**
1159   * Get a delegation token for the user from the JobTracker.
1160   * @param renewer the user who can renew the token
1161   * @return the new token
1162   * @throws IOException
1163   */
1164  public Token<DelegationTokenIdentifier> 
1165    getDelegationToken(final Text renewer) throws IOException, InterruptedException {
1166    getDelegationTokenCalled = true;
1167    return clientUgi.doAs(new 
1168        PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
1169      public Token<DelegationTokenIdentifier> run() throws IOException, 
1170      InterruptedException {
1171        return cluster.getDelegationToken(renewer);
1172      }
1173    });
1174  }
1175
1176  /**
1177   * Renew a delegation token
1178   * @param token the token to renew
1179   * @return true if the renewal went well
1180   * @throws InvalidToken
1181   * @throws IOException
1182   * @deprecated Use {@link Token#renew} instead
1183   */
1184  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
1185                                   ) throws InvalidToken, IOException, 
1186                                            InterruptedException {
1187    return token.renew(getConf());
1188  }
1189
1190  /**
1191   * Cancel a delegation token from the JobTracker
1192   * @param token the token to cancel
1193   * @throws IOException
1194   * @deprecated Use {@link Token#cancel} instead
1195   */
1196  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
1197                                    ) throws InvalidToken, IOException, 
1198                                             InterruptedException {
1199    token.cancel(getConf());
1200  }
1201
1202  /**
1203   */
1204  public static void main(String argv[]) throws Exception {
1205    int res = ToolRunner.run(new JobClient(), argv);
1206    System.exit(res);
1207  }
1208}
1209