001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapred;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.net.InetSocketAddress;
023import java.net.URL;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.List;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.io.Text;
036import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
037import org.apache.hadoop.mapreduce.Cluster;
038import org.apache.hadoop.mapreduce.ClusterMetrics;
039import org.apache.hadoop.mapreduce.Job;
040import org.apache.hadoop.mapreduce.QueueInfo;
041import org.apache.hadoop.mapreduce.TaskTrackerInfo;
042import org.apache.hadoop.mapreduce.TaskType;
043import org.apache.hadoop.mapreduce.filecache.DistributedCache;
044import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
045import org.apache.hadoop.mapreduce.tools.CLI;
046import org.apache.hadoop.mapreduce.util.ConfigUtil;
047import org.apache.hadoop.security.UserGroupInformation;
048import org.apache.hadoop.security.token.SecretManager.InvalidToken;
049import org.apache.hadoop.security.token.Token;
050import org.apache.hadoop.security.token.TokenRenewer;
051import org.apache.hadoop.util.Tool;
052import org.apache.hadoop.util.ToolRunner;
053
054/**
055 * <code>JobClient</code> is the primary interface for the user-job to interact
056 * with the cluster.
057 * 
058 * <code>JobClient</code> provides facilities to submit jobs, track their 
059 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
060 * status information etc.
061 * 
062 * <p>The job submission process involves:
063 * <ol>
064 *   <li>
065 *   Checking the input and output specifications of the job.
066 *   </li>
067 *   <li>
068 *   Computing the {@link InputSplit}s for the job.
069 *   </li>
070 *   <li>
071 *   Setup the requisite accounting information for the {@link DistributedCache} 
072 *   of the job, if necessary.
073 *   </li>
074 *   <li>
075 *   Copying the job's jar and configuration to the map-reduce system directory 
076 *   on the distributed file-system. 
077 *   </li>
078 *   <li>
079 *   Submitting the job to the cluster and optionally monitoring
080 *   it's status.
081 *   </li>
082 * </ol></p>
083 *  
084 * Normally the user creates the application, describes various facets of the
085 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 
086 * the job and monitor its progress.
087 * 
088 * <p>Here is an example on how to use <code>JobClient</code>:</p>
089 * <p><blockquote><pre>
090 *     // Create a new JobConf
091 *     JobConf job = new JobConf(new Configuration(), MyJob.class);
092 *     
093 *     // Specify various job-specific parameters     
094 *     job.setJobName("myjob");
095 *     
096 *     job.setInputPath(new Path("in"));
097 *     job.setOutputPath(new Path("out"));
098 *     
099 *     job.setMapperClass(MyJob.MyMapper.class);
100 *     job.setReducerClass(MyJob.MyReducer.class);
101 *
102 *     // Submit the job, then poll for progress until the job is complete
103 *     JobClient.runJob(job);
104 * </pre></blockquote></p>
105 * 
106 * <h4 id="JobControl">Job Control</h4>
107 * 
108 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 
109 * which cannot be done via a single map-reduce job. This is fairly easy since 
110 * the output of the job, typically, goes to distributed file-system and that 
111 * can be used as the input for the next job.</p>
112 * 
113 * <p>However, this also means that the onus on ensuring jobs are complete 
114 * (success/failure) lies squarely on the clients. In such situations the 
115 * various job-control options are:
116 * <ol>
117 *   <li>
118 *   {@link #runJob(JobConf)} : submits the job and returns only after 
119 *   the job has completed.
120 *   </li>
121 *   <li>
122 *   {@link #submitJob(JobConf)} : only submits the job, then poll the 
123 *   returned handle to the {@link RunningJob} to query status and make 
124 *   scheduling decisions.
125 *   </li>
126 *   <li>
127 *   {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
128 *   on job-completion, thus avoiding polling.
129 *   </li>
130 * </ol></p>
131 * 
132 * @see JobConf
133 * @see ClusterStatus
134 * @see Tool
135 * @see DistributedCache
136 */
137@InterfaceAudience.Public
138@InterfaceStability.Stable
139public class JobClient extends CLI {
140
141  @InterfaceAudience.Private
142  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
143      "mapreduce.jobclient.retry.policy.enabled";
144  @InterfaceAudience.Private
145  public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT =
146      false;
147  @InterfaceAudience.Private
148  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
149      "mapreduce.jobclient.retry.policy.spec";
150  @InterfaceAudience.Private
151  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
152      "10000,6,60000,10"; // t1,n1,t2,n2,...
153
154  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
155  private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
156  
157  static{
158    ConfigUtil.loadResources();
159  }
160
161  /**
162   * A NetworkedJob is an implementation of RunningJob.  It holds
163   * a JobProfile object to provide some info, and interacts with the
164   * remote service to provide certain functionality.
165   */
166  static class NetworkedJob implements RunningJob {
167    Job job;
168    /**
169     * We store a JobProfile and a timestamp for when we last
170     * acquired the job profile.  If the job is null, then we cannot
171     * perform any of the tasks.  The job might be null if the cluster
172     * has completely forgotten about the job.  (eg, 24 hours after the
173     * job completes.)
174     */
175    public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
176      this(status, cluster, new JobConf(status.getJobFile()));
177    }
178    
179    private NetworkedJob(JobStatus status, Cluster cluster, JobConf conf)
180        throws IOException {
181      this(Job.getInstance(cluster, status, conf));
182    }
183
184    public NetworkedJob(Job job) throws IOException {
185      this.job = job;
186    }
187
188    public Configuration getConfiguration() {
189      return job.getConfiguration();
190    }
191
192    /**
193     * An identifier for the job
194     */
195    public JobID getID() {
196      return JobID.downgrade(job.getJobID());
197    }
198    
199    /** @deprecated This method is deprecated and will be removed. Applications should 
200     * rather use {@link #getID()}.*/
201    @Deprecated
202    public String getJobID() {
203      return getID().toString();
204    }
205    
206    /**
207     * The user-specified job name
208     */
209    public String getJobName() {
210      return job.getJobName();
211    }
212
213    /**
214     * The name of the job file
215     */
216    public String getJobFile() {
217      return job.getJobFile();
218    }
219
220    /**
221     * A URL where the job's status can be seen
222     */
223    public String getTrackingURL() {
224      return job.getTrackingURL();
225    }
226
227    /**
228     * A float between 0.0 and 1.0, indicating the % of map work
229     * completed.
230     */
231    public float mapProgress() throws IOException {
232      return job.mapProgress();
233    }
234
235    /**
236     * A float between 0.0 and 1.0, indicating the % of reduce work
237     * completed.
238     */
239    public float reduceProgress() throws IOException {
240      return job.reduceProgress();
241    }
242
243    /**
244     * A float between 0.0 and 1.0, indicating the % of cleanup work
245     * completed.
246     */
247    public float cleanupProgress() throws IOException {
248      try {
249        return job.cleanupProgress();
250      } catch (InterruptedException ie) {
251        throw new IOException(ie);
252      }
253    }
254
255    /**
256     * A float between 0.0 and 1.0, indicating the % of setup work
257     * completed.
258     */
259    public float setupProgress() throws IOException {
260      return job.setupProgress();
261    }
262
263    /**
264     * Returns immediately whether the whole job is done yet or not.
265     */
266    public synchronized boolean isComplete() throws IOException {
267      return job.isComplete();
268    }
269
270    /**
271     * True iff job completed successfully.
272     */
273    public synchronized boolean isSuccessful() throws IOException {
274      return job.isSuccessful();
275    }
276
277    /**
278     * Blocks until the job is finished
279     */
280    public void waitForCompletion() throws IOException {
281      try {
282        job.waitForCompletion(false);
283      } catch (InterruptedException ie) {
284        throw new IOException(ie);
285      } catch (ClassNotFoundException ce) {
286        throw new IOException(ce);
287      }
288    }
289
290    /**
291     * Tells the service to get the state of the current job.
292     */
293    public synchronized int getJobState() throws IOException {
294      try {
295        return job.getJobState().getValue();
296      } catch (InterruptedException ie) {
297        throw new IOException(ie);
298      }
299    }
300    
301    /**
302     * Tells the service to terminate the current job.
303     */
304    public synchronized void killJob() throws IOException {
305      job.killJob();
306    }
307   
308    
309    /** Set the priority of the job.
310    * @param priority new priority of the job. 
311    */
312    public synchronized void setJobPriority(String priority) 
313                                                throws IOException {
314      try {
315        job.setPriority(
316          org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
317      } catch (InterruptedException ie) {
318        throw new IOException(ie);
319      }
320    }
321    
322    /**
323     * Kill indicated task attempt.
324     * @param taskId the id of the task to kill.
325     * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
326     * it is just killed, w/o affecting job failure status.
327     */
328    public synchronized void killTask(TaskAttemptID taskId,
329        boolean shouldFail) throws IOException {
330      if (shouldFail) {
331        job.failTask(taskId);
332      } else {
333        job.killTask(taskId);
334      }
335    }
336
337    /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
338    @Deprecated
339    public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
340      killTask(TaskAttemptID.forName(taskId), shouldFail);
341    }
342    
343    /**
344     * Fetch task completion events from cluster for this job. 
345     */
346    public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
347        int startFrom) throws IOException {
348      try {
349        org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 
350          job.getTaskCompletionEvents(startFrom, 10);
351        TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
352        for (int i = 0 ; i < acls.length; i++ ) {
353          ret[i] = TaskCompletionEvent.downgrade(acls[i]);
354        }
355        return ret;
356      } catch (InterruptedException ie) {
357        throw new IOException(ie);
358      }
359    }
360
361    /**
362     * Dump stats to screen
363     */
364    @Override
365    public String toString() {
366      return job.toString();
367    }
368        
369    /**
370     * Returns the counters for this job
371     */
372    public Counters getCounters() throws IOException {
373      Counters result = null;
374      org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
375      if(temp != null) {
376        result = Counters.downgrade(temp);
377      }
378      return result;
379    }
380    
381    @Override
382    public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
383      try { 
384        return job.getTaskDiagnostics(id);
385      } catch (InterruptedException ie) {
386        throw new IOException(ie);
387      }
388    }
389
390    public String getHistoryUrl() throws IOException {
391      try {
392        return job.getHistoryUrl();
393      } catch (InterruptedException ie) {
394        throw new IOException(ie);
395      }
396    }
397
398    public boolean isRetired() throws IOException {
399      try {
400        return job.isRetired();
401      } catch (InterruptedException ie) {
402        throw new IOException(ie);
403      }
404    }
405    
406    boolean monitorAndPrintJob() throws IOException, InterruptedException {
407      return job.monitorAndPrintJob();
408    }
409    
410    @Override
411    public String getFailureInfo() throws IOException {
412      try {
413        return job.getStatus().getFailureInfo();
414      } catch (InterruptedException ie) {
415        throw new IOException(ie);
416      }
417    }
418
419    @Override
420    public JobStatus getJobStatus() throws IOException {
421      try {
422        return JobStatus.downgrade(job.getStatus());
423      } catch (InterruptedException ie) {
424        throw new IOException(ie);
425      }
426    }
427  }
428
429  /**
430   * Ugi of the client. We store this ugi when the client is created and 
431   * then make sure that the same ugi is used to run the various protocols.
432   */
433  UserGroupInformation clientUgi;
434  
435  /**
436   * Create a job client.
437   */
438  public JobClient() {
439  }
440    
441  /**
442   * Build a job client with the given {@link JobConf}, and connect to the 
443   * default cluster
444   * 
445   * @param conf the job configuration.
446   * @throws IOException
447   */
448  public JobClient(JobConf conf) throws IOException {
449    init(conf);
450  }
451
452  /**
453   * Build a job client with the given {@link Configuration}, 
454   * and connect to the default cluster
455   * 
456   * @param conf the configuration.
457   * @throws IOException
458   */
459  public JobClient(Configuration conf) throws IOException {
460    init(new JobConf(conf));
461  }
462
463  /**
464   * Connect to the default cluster
465   * @param conf the job configuration.
466   * @throws IOException
467   */
468  public void init(JobConf conf) throws IOException {
469    setConf(conf);
470    cluster = new Cluster(conf);
471    clientUgi = UserGroupInformation.getCurrentUser();
472  }
473
474  /**
475   * Build a job client, connect to the indicated job tracker.
476   * 
477   * @param jobTrackAddr the job tracker to connect to.
478   * @param conf configuration.
479   */
480  public JobClient(InetSocketAddress jobTrackAddr, 
481                   Configuration conf) throws IOException {
482    cluster = new Cluster(jobTrackAddr, conf);
483    clientUgi = UserGroupInformation.getCurrentUser();
484  }
485
486  /**
487   * Close the <code>JobClient</code>.
488   */
489  public synchronized void close() throws IOException {
490    cluster.close();
491  }
492
493  /**
494   * Get a filesystem handle.  We need this to prepare jobs
495   * for submission to the MapReduce system.
496   * 
497   * @return the filesystem handle.
498   */
499  public synchronized FileSystem getFs() throws IOException {
500    try { 
501      return cluster.getFileSystem();
502    } catch (InterruptedException ie) {
503      throw new IOException(ie);
504    }
505  }
506  
507  /**
508   * Get a handle to the Cluster
509   */
510  public Cluster getClusterHandle() {
511    return cluster;
512  }
513  
514  /**
515   * Submit a job to the MR system.
516   * 
517   * This returns a handle to the {@link RunningJob} which can be used to track
518   * the running-job.
519   * 
520   * @param jobFile the job configuration.
521   * @return a handle to the {@link RunningJob} which can be used to track the
522   *         running-job.
523   * @throws FileNotFoundException
524   * @throws InvalidJobConfException
525   * @throws IOException
526   */
527  public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
528                                                     InvalidJobConfException, 
529                                                     IOException {
530    // Load in the submitted job details
531    JobConf job = new JobConf(jobFile);
532    return submitJob(job);
533  }
534    
535  /**
536   * Submit a job to the MR system.
537   * This returns a handle to the {@link RunningJob} which can be used to track
538   * the running-job.
539   * 
540   * @param conf the job configuration.
541   * @return a handle to the {@link RunningJob} which can be used to track the
542   *         running-job.
543   * @throws FileNotFoundException
544   * @throws IOException
545   */
546  public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
547                                                  IOException {
548    return submitJobInternal(conf);
549  }
550
551  @InterfaceAudience.Private
552  public RunningJob submitJobInternal(final JobConf conf)
553      throws FileNotFoundException, IOException {
554    try {
555      conf.setBooleanIfUnset("mapred.mapper.new-api", false);
556      conf.setBooleanIfUnset("mapred.reducer.new-api", false);
557      Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
558        @Override
559        public Job run() throws IOException, ClassNotFoundException, 
560          InterruptedException {
561          Job job = Job.getInstance(conf);
562          job.submit();
563          return job;
564        }
565      });
566      // update our Cluster instance with the one created by Job for submission
567      // (we can't pass our Cluster instance to Job, since Job wraps the config
568      // instance, and the two configs would then diverge)
569      cluster = job.getCluster();
570      return new NetworkedJob(job);
571    } catch (InterruptedException ie) {
572      throw new IOException("interrupted", ie);
573    }
574  }
575
576  private Job getJobUsingCluster(final JobID jobid) throws IOException,
577  InterruptedException {
578    return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
579      public Job run() throws IOException, InterruptedException  {
580       return cluster.getJob(jobid);
581      }
582    });
583  }
584  /**
585   * Get an {@link RunningJob} object to track an ongoing job.  Returns
586   * null if the id does not correspond to any known job.
587   * 
588   * @param jobid the jobid of the job.
589   * @return the {@link RunningJob} handle to track the job, null if the 
590   *         <code>jobid</code> doesn't correspond to any known job.
591   * @throws IOException
592   */
593  public RunningJob getJob(final JobID jobid) throws IOException {
594    try {
595      
596      Job job = getJobUsingCluster(jobid);
597      if (job != null) {
598        JobStatus status = JobStatus.downgrade(job.getStatus());
599        if (status != null) {
600          return new NetworkedJob(status, cluster,
601              new JobConf(job.getConfiguration()));
602        } 
603      }
604    } catch (InterruptedException ie) {
605      throw new IOException(ie);
606    }
607    return null;
608  }
609
610  /**@deprecated Applications should rather use {@link #getJob(JobID)}. 
611   */
612  @Deprecated
613  public RunningJob getJob(String jobid) throws IOException {
614    return getJob(JobID.forName(jobid));
615  }
616  
617  private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
618  
619  /**
620   * Get the information of the current state of the map tasks of a job.
621   * 
622   * @param jobId the job to query.
623   * @return the list of all of the map tips.
624   * @throws IOException
625   */
626  public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
627    return getTaskReports(jobId, TaskType.MAP);
628  }
629  
630  private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 
631    IOException {
632    try {
633      Job j = getJobUsingCluster(jobId);
634      if(j == null) {
635        return EMPTY_TASK_REPORTS;
636      }
637      return TaskReport.downgradeArray(j.getTaskReports(type));
638    } catch (InterruptedException ie) {
639      throw new IOException(ie);
640    }
641  }
642  
643  /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
644  @Deprecated
645  public TaskReport[] getMapTaskReports(String jobId) throws IOException {
646    return getMapTaskReports(JobID.forName(jobId));
647  }
648  
649  /**
650   * Get the information of the current state of the reduce tasks of a job.
651   * 
652   * @param jobId the job to query.
653   * @return the list of all of the reduce tips.
654   * @throws IOException
655   */    
656  public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
657    return getTaskReports(jobId, TaskType.REDUCE);
658  }
659
660  /**
661   * Get the information of the current state of the cleanup tasks of a job.
662   * 
663   * @param jobId the job to query.
664   * @return the list of all of the cleanup tips.
665   * @throws IOException
666   */    
667  public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
668    return getTaskReports(jobId, TaskType.JOB_CLEANUP);
669  }
670
671  /**
672   * Get the information of the current state of the setup tasks of a job.
673   * 
674   * @param jobId the job to query.
675   * @return the list of all of the setup tips.
676   * @throws IOException
677   */    
678  public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
679    return getTaskReports(jobId, TaskType.JOB_SETUP);
680  }
681
682  
683  /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
684  @Deprecated
685  public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
686    return getReduceTaskReports(JobID.forName(jobId));
687  }
688  
689  /**
690   * Display the information about a job's tasks, of a particular type and
691   * in a particular state
692   * 
693   * @param jobId the ID of the job
694   * @param type the type of the task (map/reduce/setup/cleanup)
695   * @param state the state of the task 
696   * (pending/running/completed/failed/killed)
697   */
698  public void displayTasks(final JobID jobId, String type, String state) 
699  throws IOException {
700    try {
701      Job job = getJobUsingCluster(jobId);
702      super.displayTasks(job, type, state);
703    } catch (InterruptedException ie) {
704      throw new IOException(ie);
705    }
706  }
707  
708  /**
709   * Get status information about the Map-Reduce cluster.
710   *  
711   * @return the status information about the Map-Reduce cluster as an object
712   *         of {@link ClusterStatus}.
713   * @throws IOException
714   */
715  public ClusterStatus getClusterStatus() throws IOException {
716    try {
717      return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
718        public ClusterStatus run() throws IOException, InterruptedException {
719          ClusterMetrics metrics = cluster.getClusterStatus();
720          return new ClusterStatus(metrics.getTaskTrackerCount(), metrics
721            .getBlackListedTaskTrackerCount(), cluster
722            .getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
723            metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
724            metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(),
725            metrics.getDecommissionedTaskTrackerCount(), metrics
726              .getGrayListedTaskTrackerCount());
727        }
728      });
729    } catch (InterruptedException ie) {
730      throw new IOException(ie);
731    }
732  }
733
734  private  Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
735    Collection<String> list = new ArrayList<String>();
736    for (TaskTrackerInfo info: objs) {
737      list.add(info.getTaskTrackerName());
738    }
739    return list;
740  }
741
742  private  Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
743    Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
744    for (TaskTrackerInfo info: objs) {
745      BlackListInfo binfo = new BlackListInfo();
746      binfo.setTrackerName(info.getTaskTrackerName());
747      binfo.setReasonForBlackListing(info.getReasonForBlacklist());
748      binfo.setBlackListReport(info.getBlacklistReport());
749      list.add(binfo);
750    }
751    return list;
752  }
753
754  /**
755   * Get status information about the Map-Reduce cluster.
756   *  
757   * @param  detailed if true then get a detailed status including the
758   *         tracker names
759   * @return the status information about the Map-Reduce cluster as an object
760   *         of {@link ClusterStatus}.
761   * @throws IOException
762   */
763  public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
764    try {
765      return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
766        public ClusterStatus run() throws IOException, InterruptedException {
767        ClusterMetrics metrics = cluster.getClusterStatus();
768        return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
769          arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
770          cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
771          metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
772          metrics.getReduceSlotCapacity(), 
773          cluster.getJobTrackerStatus());
774        }
775      });
776    } catch (InterruptedException ie) {
777      throw new IOException(ie);
778    }
779  }
780    
781
782  /** 
783   * Get the jobs that are not completed and not failed.
784   * 
785   * @return array of {@link JobStatus} for the running/to-be-run jobs.
786   * @throws IOException
787   */
788  public JobStatus[] jobsToComplete() throws IOException {
789    List<JobStatus> stats = new ArrayList<JobStatus>();
790    for (JobStatus stat : getAllJobs()) {
791      if (!stat.isJobComplete()) {
792        stats.add(stat);
793      }
794    }
795    return stats.toArray(new JobStatus[0]);
796  }
797
798  /** 
799   * Get the jobs that are submitted.
800   * 
801   * @return array of {@link JobStatus} for the submitted jobs.
802   * @throws IOException
803   */
804  public JobStatus[] getAllJobs() throws IOException {
805    try {
806      org.apache.hadoop.mapreduce.JobStatus[] jobs = 
807          clientUgi.doAs(new PrivilegedExceptionAction<
808              org.apache.hadoop.mapreduce.JobStatus[]> () {
809            public org.apache.hadoop.mapreduce.JobStatus[] run() 
810                throws IOException, InterruptedException {
811              return cluster.getAllJobStatuses();
812            }
813          });
814      JobStatus[] stats = new JobStatus[jobs.length];
815      for (int i = 0; i < jobs.length; i++) {
816        stats[i] = JobStatus.downgrade(jobs[i]);
817      }
818      return stats;
819    } catch (InterruptedException ie) {
820      throw new IOException(ie);
821    }
822  }
823  
824  /** 
825   * Utility that submits a job, then polls for progress until the job is
826   * complete.
827   * 
828   * @param job the job configuration.
829   * @throws IOException if the job fails
830   */
831  public static RunningJob runJob(JobConf job) throws IOException {
832    JobClient jc = new JobClient(job);
833    RunningJob rj = jc.submitJob(job);
834    try {
835      if (!jc.monitorAndPrintJob(job, rj)) {
836        throw new IOException("Job failed!");
837      }
838    } catch (InterruptedException ie) {
839      Thread.currentThread().interrupt();
840    }
841    return rj;
842  }
843  
844  /**
845   * Monitor a job and print status in real-time as progress is made and tasks 
846   * fail.
847   * @param conf the job's configuration
848   * @param job the job to track
849   * @return true if the job succeeded
850   * @throws IOException if communication to the JobTracker fails
851   */
852  public boolean monitorAndPrintJob(JobConf conf, 
853                                    RunningJob job
854  ) throws IOException, InterruptedException {
855    return ((NetworkedJob)job).monitorAndPrintJob();
856  }
857
858  static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
859    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
860  }
861  
862  static Configuration getConfiguration(String jobTrackerSpec)
863  {
864    Configuration conf = new Configuration();
865    if (jobTrackerSpec != null) {        
866      if (jobTrackerSpec.indexOf(":") >= 0) {
867        conf.set("mapred.job.tracker", jobTrackerSpec);
868      } else {
869        String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
870        URL validate = conf.getResource(classpathFile);
871        if (validate == null) {
872          throw new RuntimeException(classpathFile + " not found on CLASSPATH");
873        }
874        conf.addResource(classpathFile);
875      }
876    }
877    return conf;
878  }
879
880  /**
881   * Sets the output filter for tasks. only those tasks are printed whose
882   * output matches the filter. 
883   * @param newValue task filter.
884   */
885  @Deprecated
886  public void setTaskOutputFilter(TaskStatusFilter newValue){
887    this.taskOutputFilter = newValue;
888  }
889    
890  /**
891   * Get the task output filter out of the JobConf.
892   * 
893   * @param job the JobConf to examine.
894   * @return the filter level.
895   */
896  public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
897    return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
898                                            "FAILED"));
899  }
900    
901  /**
902   * Modify the JobConf to set the task output filter.
903   * 
904   * @param job the JobConf to modify.
905   * @param newValue the value to set.
906   */
907  public static void setTaskOutputFilter(JobConf job, 
908                                         TaskStatusFilter newValue) {
909    job.set("jobclient.output.filter", newValue.toString());
910  }
911    
912  /**
913   * Returns task output filter.
914   * @return task filter. 
915   */
916  @Deprecated
917  public TaskStatusFilter getTaskOutputFilter(){
918    return this.taskOutputFilter; 
919  }
920
921  protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
922      String counterGroupName, String counterName) throws IOException {
923    Counters counters = Counters.downgrade(cntrs);
924    return counters.findCounter(counterGroupName, counterName).getValue();
925  }
926
927  /**
928   * Get status information about the max available Maps in the cluster.
929   *  
930   * @return the max available Maps in the cluster
931   * @throws IOException
932   */
933  public int getDefaultMaps() throws IOException {
934    try {
935      return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
936        @Override
937        public Integer run() throws IOException, InterruptedException {
938          return cluster.getClusterStatus().getMapSlotCapacity();
939        }
940      });
941    } catch (InterruptedException ie) {
942      throw new IOException(ie);
943    }
944  }
945
946  /**
947   * Get status information about the max available Reduces in the cluster.
948   *  
949   * @return the max available Reduces in the cluster
950   * @throws IOException
951   */
952  public int getDefaultReduces() throws IOException {
953    try {
954      return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
955        @Override
956        public Integer run() throws IOException, InterruptedException {
957          return cluster.getClusterStatus().getReduceSlotCapacity();
958        }
959      });
960    } catch (InterruptedException ie) {
961      throw new IOException(ie);
962    }
963  }
964
965  /**
966   * Grab the jobtracker system directory path where job-specific files are to be placed.
967   * 
968   * @return the system directory where job-specific files are to be placed.
969   */
970  public Path getSystemDir() {
971    try {
972      return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
973        @Override
974        public Path run() throws IOException, InterruptedException {
975          return cluster.getSystemDir();
976        }
977      });
978      } catch (IOException ioe) {
979      return null;
980    } catch (InterruptedException ie) {
981      return null;
982    }
983  }
984
985  /**
986   * Checks if the job directory is clean and has all the required components
987   * for (re) starting the job
988   */
989  public static boolean isJobDirValid(Path jobDirPath, FileSystem fs)
990      throws IOException {
991    FileStatus[] contents = fs.listStatus(jobDirPath);
992    int matchCount = 0;
993    if (contents != null && contents.length >= 2) {
994      for (FileStatus status : contents) {
995        if ("job.xml".equals(status.getPath().getName())) {
996          ++matchCount;
997        }
998        if ("job.split".equals(status.getPath().getName())) {
999          ++matchCount;
1000        }
1001      }
1002      if (matchCount == 2) {
1003        return true;
1004      }
1005    }
1006    return false;
1007  }
1008
1009  /**
1010   * Fetch the staging area directory for the application
1011   * 
1012   * @return path to staging area directory
1013   * @throws IOException
1014   */
1015  public Path getStagingAreaDir() throws IOException {
1016    try {
1017      return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
1018        @Override
1019        public Path run() throws IOException, InterruptedException {
1020          return cluster.getStagingAreaDir();
1021        }
1022      });
1023    } catch (InterruptedException ie) {
1024      // throw RuntimeException instead for compatibility reasons
1025      throw new RuntimeException(ie);
1026    }
1027  }
1028
1029  private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
1030    JobQueueInfo ret = new JobQueueInfo(queue);
1031    // make sure to convert any children
1032    if (queue.getQueueChildren().size() > 0) {
1033      List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
1034          .getQueueChildren().size());
1035      for (QueueInfo child : queue.getQueueChildren()) {
1036        childQueues.add(getJobQueueInfo(child));
1037      }
1038      ret.setChildren(childQueues);
1039    }
1040    return ret;
1041  }
1042
1043  private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
1044      throws IOException {
1045    JobQueueInfo[] ret = new JobQueueInfo[queues.length];
1046    for (int i = 0; i < queues.length; i++) {
1047      ret[i] = getJobQueueInfo(queues[i]);
1048    }
1049    return ret;
1050  }
1051
1052  /**
1053   * Returns an array of queue information objects about root level queues
1054   * configured
1055   *
1056   * @return the array of root level JobQueueInfo objects
1057   * @throws IOException
1058   */
1059  public JobQueueInfo[] getRootQueues() throws IOException {
1060    try {
1061      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1062        public JobQueueInfo[] run() throws IOException, InterruptedException {
1063          return getJobQueueInfoArray(cluster.getRootQueues());
1064        }
1065      });
1066    } catch (InterruptedException ie) {
1067      throw new IOException(ie);
1068    }
1069  }
1070
1071  /**
1072   * Returns an array of queue information objects about immediate children
1073   * of queue queueName.
1074   * 
1075   * @param queueName
1076   * @return the array of immediate children JobQueueInfo objects
1077   * @throws IOException
1078   */
1079  public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
1080    try {
1081      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1082        public JobQueueInfo[] run() throws IOException, InterruptedException {
1083          return getJobQueueInfoArray(cluster.getChildQueues(queueName));
1084        }
1085      });
1086    } catch (InterruptedException ie) {
1087      throw new IOException(ie);
1088    }
1089  }
1090  
1091  /**
1092   * Return an array of queue information objects about all the Job Queues
1093   * configured.
1094   * 
1095   * @return Array of JobQueueInfo objects
1096   * @throws IOException
1097   */
1098  public JobQueueInfo[] getQueues() throws IOException {
1099    try {
1100      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1101        public JobQueueInfo[] run() throws IOException, InterruptedException {
1102          return getJobQueueInfoArray(cluster.getQueues());
1103        }
1104      });
1105    } catch (InterruptedException ie) {
1106      throw new IOException(ie);
1107    }
1108  }
1109  
1110  /**
1111   * Gets all the jobs which were added to particular Job Queue
1112   * 
1113   * @param queueName name of the Job Queue
1114   * @return Array of jobs present in the job queue
1115   * @throws IOException
1116   */
1117  
1118  public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
1119    try {
1120      QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
1121        @Override
1122        public QueueInfo run() throws IOException, InterruptedException {
1123          return cluster.getQueue(queueName);
1124        }
1125      });
1126      if (queue == null) {
1127        return null;
1128      }
1129      org.apache.hadoop.mapreduce.JobStatus[] stats = 
1130        queue.getJobStatuses();
1131      JobStatus[] ret = new JobStatus[stats.length];
1132      for (int i = 0 ; i < stats.length; i++ ) {
1133        ret[i] = JobStatus.downgrade(stats[i]);
1134      }
1135      return ret;
1136    } catch (InterruptedException ie) {
1137      throw new IOException(ie);
1138    }
1139  }
1140  
1141  /**
1142   * Gets the queue information associated to a particular Job Queue
1143   * 
1144   * @param queueName name of the job queue.
1145   * @return Queue information associated to particular queue.
1146   * @throws IOException
1147   */
1148  public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
1149    try {
1150      QueueInfo queueInfo = clientUgi.doAs(new 
1151          PrivilegedExceptionAction<QueueInfo>() {
1152        public QueueInfo run() throws IOException, InterruptedException {
1153          return cluster.getQueue(queueName);
1154        }
1155      });
1156      if (queueInfo != null) {
1157        return new JobQueueInfo(queueInfo);
1158      }
1159      return null;
1160    } catch (InterruptedException ie) {
1161      throw new IOException(ie);
1162    }
1163  }
1164  
1165  /**
1166   * Gets the Queue ACLs for current user
1167   * @return array of QueueAclsInfo object for current user.
1168   * @throws IOException
1169   */
1170  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
1171    try {
1172      org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 
1173        clientUgi.doAs(new 
1174            PrivilegedExceptionAction
1175            <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
1176              public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 
1177              throws IOException, InterruptedException {
1178                return cluster.getQueueAclsForCurrentUser();
1179              }
1180        });
1181      QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
1182      for (int i = 0 ; i < acls.length; i++ ) {
1183        ret[i] = QueueAclsInfo.downgrade(acls[i]);
1184      }
1185      return ret;
1186    } catch (InterruptedException ie) {
1187      throw new IOException(ie);
1188    }
1189  }
1190
1191  /**
1192   * Get a delegation token for the user from the JobTracker.
1193   * @param renewer the user who can renew the token
1194   * @return the new token
1195   * @throws IOException
1196   */
1197  public Token<DelegationTokenIdentifier> 
1198    getDelegationToken(final Text renewer) throws IOException, InterruptedException {
1199    return clientUgi.doAs(new 
1200        PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
1201      public Token<DelegationTokenIdentifier> run() throws IOException, 
1202      InterruptedException {
1203        return cluster.getDelegationToken(renewer);
1204      }
1205    });
1206  }
1207
1208  /**
1209   * Renew a delegation token
1210   * @param token the token to renew
1211   * @return true if the renewal went well
1212   * @throws InvalidToken
1213   * @throws IOException
1214   * @deprecated Use {@link Token#renew} instead
1215   */
1216  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
1217                                   ) throws InvalidToken, IOException, 
1218                                            InterruptedException {
1219    return token.renew(getConf());
1220  }
1221
1222  /**
1223   * Cancel a delegation token from the JobTracker
1224   * @param token the token to cancel
1225   * @throws IOException
1226   * @deprecated Use {@link Token#cancel} instead
1227   */
1228  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
1229                                    ) throws InvalidToken, IOException, 
1230                                             InterruptedException {
1231    token.cancel(getConf());
1232  }
1233
1234  /**
1235   */
1236  public static void main(String argv[]) throws Exception {
1237    int res = ToolRunner.run(new JobClient(), argv);
1238    System.exit(res);
1239  }
1240}
1241