001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapred;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.net.InetSocketAddress;
023import java.net.URL;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.List;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.io.Text;
036import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
037import org.apache.hadoop.mapreduce.Cluster;
038import org.apache.hadoop.mapreduce.ClusterMetrics;
039import org.apache.hadoop.mapreduce.Job;
040import org.apache.hadoop.mapreduce.QueueInfo;
041import org.apache.hadoop.mapreduce.TaskTrackerInfo;
042import org.apache.hadoop.mapreduce.TaskType;
043import org.apache.hadoop.mapreduce.filecache.DistributedCache;
044import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
045import org.apache.hadoop.mapreduce.tools.CLI;
046import org.apache.hadoop.mapreduce.util.ConfigUtil;
047import org.apache.hadoop.security.UserGroupInformation;
048import org.apache.hadoop.security.token.SecretManager.InvalidToken;
049import org.apache.hadoop.security.token.Token;
050import org.apache.hadoop.security.token.TokenRenewer;
051import org.apache.hadoop.util.Tool;
052import org.apache.hadoop.util.ToolRunner;
053
054/**
055 * <code>JobClient</code> is the primary interface for the user-job to interact
056 * with the cluster.
057 * 
058 * <code>JobClient</code> provides facilities to submit jobs, track their 
059 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
060 * status information etc.
061 * 
062 * <p>The job submission process involves:
063 * <ol>
064 *   <li>
065 *   Checking the input and output specifications of the job.
066 *   </li>
067 *   <li>
068 *   Computing the {@link InputSplit}s for the job.
069 *   </li>
070 *   <li>
071 *   Setup the requisite accounting information for the {@link DistributedCache} 
072 *   of the job, if necessary.
073 *   </li>
074 *   <li>
075 *   Copying the job's jar and configuration to the map-reduce system directory 
076 *   on the distributed file-system. 
077 *   </li>
078 *   <li>
079 *   Submitting the job to the cluster and optionally monitoring
080 *   it's status.
081 *   </li>
082 * </ol></p>
083 *  
084 * Normally the user creates the application, describes various facets of the
085 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 
086 * the job and monitor its progress.
087 * 
088 * <p>Here is an example on how to use <code>JobClient</code>:</p>
089 * <p><blockquote><pre>
090 *     // Create a new JobConf
091 *     JobConf job = new JobConf(new Configuration(), MyJob.class);
092 *     
093 *     // Specify various job-specific parameters     
094 *     job.setJobName("myjob");
095 *     
096 *     job.setInputPath(new Path("in"));
097 *     job.setOutputPath(new Path("out"));
098 *     
099 *     job.setMapperClass(MyJob.MyMapper.class);
100 *     job.setReducerClass(MyJob.MyReducer.class);
101 *
102 *     // Submit the job, then poll for progress until the job is complete
103 *     JobClient.runJob(job);
104 * </pre></blockquote></p>
105 * 
106 * <h4 id="JobControl">Job Control</h4>
107 * 
108 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 
109 * which cannot be done via a single map-reduce job. This is fairly easy since 
110 * the output of the job, typically, goes to distributed file-system and that 
111 * can be used as the input for the next job.</p>
112 * 
113 * <p>However, this also means that the onus on ensuring jobs are complete 
114 * (success/failure) lies squarely on the clients. In such situations the 
115 * various job-control options are:
116 * <ol>
117 *   <li>
118 *   {@link #runJob(JobConf)} : submits the job and returns only after 
119 *   the job has completed.
120 *   </li>
121 *   <li>
122 *   {@link #submitJob(JobConf)} : only submits the job, then poll the 
123 *   returned handle to the {@link RunningJob} to query status and make 
124 *   scheduling decisions.
125 *   </li>
126 *   <li>
127 *   {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
128 *   on job-completion, thus avoiding polling.
129 *   </li>
130 * </ol></p>
131 * 
132 * @see JobConf
133 * @see ClusterStatus
134 * @see Tool
135 * @see DistributedCache
136 */
137@InterfaceAudience.Public
138@InterfaceStability.Stable
139public class JobClient extends CLI {
140
141  @InterfaceAudience.Private
142  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
143      "mapreduce.jobclient.retry.policy.enabled";
144  @InterfaceAudience.Private
145  public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT =
146      false;
147  @InterfaceAudience.Private
148  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
149      "mapreduce.jobclient.retry.policy.spec";
150  @InterfaceAudience.Private
151  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
152      "10000,6,60000,10"; // t1,n1,t2,n2,...
153
154  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
155  private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
156  
157  static{
158    ConfigUtil.loadResources();
159  }
160
161  /**
162   * A NetworkedJob is an implementation of RunningJob.  It holds
163   * a JobProfile object to provide some info, and interacts with the
164   * remote service to provide certain functionality.
165   */
166  static class NetworkedJob implements RunningJob {
167    Job job;
168    /**
169     * We store a JobProfile and a timestamp for when we last
170     * acquired the job profile.  If the job is null, then we cannot
171     * perform any of the tasks.  The job might be null if the cluster
172     * has completely forgotten about the job.  (eg, 24 hours after the
173     * job completes.)
174     */
175    public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
176      this(status, cluster, new JobConf(status.getJobFile()));
177    }
178    
179    private NetworkedJob(JobStatus status, Cluster cluster, JobConf conf)
180        throws IOException {
181      this(Job.getInstance(cluster, status, conf));
182    }
183
184    public NetworkedJob(Job job) throws IOException {
185      this.job = job;
186    }
187
188    public Configuration getConfiguration() {
189      return job.getConfiguration();
190    }
191
192    /**
193     * An identifier for the job
194     */
195    public JobID getID() {
196      return JobID.downgrade(job.getJobID());
197    }
198    
199    /** @deprecated This method is deprecated and will be removed. Applications should 
200     * rather use {@link #getID()}.*/
201    @Deprecated
202    public String getJobID() {
203      return getID().toString();
204    }
205    
206    /**
207     * The user-specified job name
208     */
209    public String getJobName() {
210      return job.getJobName();
211    }
212
213    /**
214     * The name of the job file
215     */
216    public String getJobFile() {
217      return job.getJobFile();
218    }
219
220    /**
221     * A URL where the job's status can be seen
222     */
223    public String getTrackingURL() {
224      return job.getTrackingURL();
225    }
226
227    /**
228     * A float between 0.0 and 1.0, indicating the % of map work
229     * completed.
230     */
231    public float mapProgress() throws IOException {
232      return job.mapProgress();
233    }
234
235    /**
236     * A float between 0.0 and 1.0, indicating the % of reduce work
237     * completed.
238     */
239    public float reduceProgress() throws IOException {
240      return job.reduceProgress();
241    }
242
243    /**
244     * A float between 0.0 and 1.0, indicating the % of cleanup work
245     * completed.
246     */
247    public float cleanupProgress() throws IOException {
248      try {
249        return job.cleanupProgress();
250      } catch (InterruptedException ie) {
251        throw new IOException(ie);
252      }
253    }
254
255    /**
256     * A float between 0.0 and 1.0, indicating the % of setup work
257     * completed.
258     */
259    public float setupProgress() throws IOException {
260      return job.setupProgress();
261    }
262
263    /**
264     * Returns immediately whether the whole job is done yet or not.
265     */
266    public synchronized boolean isComplete() throws IOException {
267      return job.isComplete();
268    }
269
270    /**
271     * True iff job completed successfully.
272     */
273    public synchronized boolean isSuccessful() throws IOException {
274      return job.isSuccessful();
275    }
276
277    /**
278     * Blocks until the job is finished
279     */
280    public void waitForCompletion() throws IOException {
281      try {
282        job.waitForCompletion(false);
283      } catch (InterruptedException ie) {
284        throw new IOException(ie);
285      } catch (ClassNotFoundException ce) {
286        throw new IOException(ce);
287      }
288    }
289
290    /**
291     * Tells the service to get the state of the current job.
292     */
293    public synchronized int getJobState() throws IOException {
294      try {
295        return job.getJobState().getValue();
296      } catch (InterruptedException ie) {
297        throw new IOException(ie);
298      }
299    }
300    
301    /**
302     * Tells the service to terminate the current job.
303     */
304    public synchronized void killJob() throws IOException {
305      job.killJob();
306    }
307   
308    
309    /** Set the priority of the job.
310    * @param priority new priority of the job. 
311    */
312    public synchronized void setJobPriority(String priority) 
313                                                throws IOException {
314      try {
315        job.setPriority(
316          org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
317      } catch (InterruptedException ie) {
318        throw new IOException(ie);
319      }
320    }
321    
322    /**
323     * Kill indicated task attempt.
324     * @param taskId the id of the task to kill.
325     * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
326     * it is just killed, w/o affecting job failure status.
327     */
328    public synchronized void killTask(TaskAttemptID taskId,
329        boolean shouldFail) throws IOException {
330      if (shouldFail) {
331        job.failTask(taskId);
332      } else {
333        job.killTask(taskId);
334      }
335    }
336
337    /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
338    @Deprecated
339    public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
340      killTask(TaskAttemptID.forName(taskId), shouldFail);
341    }
342    
343    /**
344     * Fetch task completion events from cluster for this job. 
345     */
346    public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
347        int startFrom) throws IOException {
348      try {
349        org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 
350          job.getTaskCompletionEvents(startFrom, 10);
351        TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
352        for (int i = 0 ; i < acls.length; i++ ) {
353          ret[i] = TaskCompletionEvent.downgrade(acls[i]);
354        }
355        return ret;
356      } catch (InterruptedException ie) {
357        throw new IOException(ie);
358      }
359    }
360
361    /**
362     * Dump stats to screen
363     */
364    @Override
365    public String toString() {
366      return job.toString();
367    }
368        
369    /**
370     * Returns the counters for this job
371     */
372    public Counters getCounters() throws IOException {
373      Counters result = null;
374      org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
375      if(temp != null) {
376        result = Counters.downgrade(temp);
377      }
378      return result;
379    }
380    
381    @Override
382    public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
383      try { 
384        return job.getTaskDiagnostics(id);
385      } catch (InterruptedException ie) {
386        throw new IOException(ie);
387      }
388    }
389
390    public String getHistoryUrl() throws IOException {
391      try {
392        return job.getHistoryUrl();
393      } catch (InterruptedException ie) {
394        throw new IOException(ie);
395      }
396    }
397
398    public boolean isRetired() throws IOException {
399      try {
400        return job.isRetired();
401      } catch (InterruptedException ie) {
402        throw new IOException(ie);
403      }
404    }
405    
406    boolean monitorAndPrintJob() throws IOException, InterruptedException {
407      return job.monitorAndPrintJob();
408    }
409    
410    @Override
411    public String getFailureInfo() throws IOException {
412      try {
413        return job.getStatus().getFailureInfo();
414      } catch (InterruptedException ie) {
415        throw new IOException(ie);
416      }
417    }
418
419    @Override
420    public JobStatus getJobStatus() throws IOException {
421      try {
422        return JobStatus.downgrade(job.getStatus());
423      } catch (InterruptedException ie) {
424        throw new IOException(ie);
425      }
426    }
427  }
428
429  /**
430   * Ugi of the client. We store this ugi when the client is created and 
431   * then make sure that the same ugi is used to run the various protocols.
432   */
433  UserGroupInformation clientUgi;
434  
435  /**
436   * Create a job client.
437   */
438  public JobClient() {
439  }
440    
441  /**
442   * Build a job client with the given {@link JobConf}, and connect to the 
443   * default cluster
444   * 
445   * @param conf the job configuration.
446   * @throws IOException
447   */
448  public JobClient(JobConf conf) throws IOException {
449    init(conf);
450  }
451
452  /**
453   * Build a job client with the given {@link Configuration}, 
454   * and connect to the default cluster
455   * 
456   * @param conf the configuration.
457   * @throws IOException
458   */
459  public JobClient(Configuration conf) throws IOException {
460    init(new JobConf(conf));
461  }
462
463  /**
464   * Connect to the default cluster
465   * @param conf the job configuration.
466   * @throws IOException
467   */
468  public void init(JobConf conf) throws IOException {
469    setConf(conf);
470    cluster = new Cluster(conf);
471    clientUgi = UserGroupInformation.getCurrentUser();
472  }
473
474  /**
475   * Build a job client, connect to the indicated job tracker.
476   * 
477   * @param jobTrackAddr the job tracker to connect to.
478   * @param conf configuration.
479   */
480  public JobClient(InetSocketAddress jobTrackAddr, 
481                   Configuration conf) throws IOException {
482    cluster = new Cluster(jobTrackAddr, conf);
483    clientUgi = UserGroupInformation.getCurrentUser();
484  }
485
486  /**
487   * Close the <code>JobClient</code>.
488   */
489  public synchronized void close() throws IOException {
490    cluster.close();
491  }
492
493  /**
494   * Get a filesystem handle.  We need this to prepare jobs
495   * for submission to the MapReduce system.
496   * 
497   * @return the filesystem handle.
498   */
499  public synchronized FileSystem getFs() throws IOException {
500    try { 
501      return cluster.getFileSystem();
502    } catch (InterruptedException ie) {
503      throw new IOException(ie);
504    }
505  }
506  
507  /**
508   * Get a handle to the Cluster
509   */
510  public Cluster getClusterHandle() {
511    return cluster;
512  }
513  
514  /**
515   * Submit a job to the MR system.
516   * 
517   * This returns a handle to the {@link RunningJob} which can be used to track
518   * the running-job.
519   * 
520   * @param jobFile the job configuration.
521   * @return a handle to the {@link RunningJob} which can be used to track the
522   *         running-job.
523   * @throws FileNotFoundException
524   * @throws InvalidJobConfException
525   * @throws IOException
526   */
527  public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
528                                                     InvalidJobConfException, 
529                                                     IOException {
530    // Load in the submitted job details
531    JobConf job = new JobConf(jobFile);
532    return submitJob(job);
533  }
534    
535  /**
536   * Submit a job to the MR system.
537   * This returns a handle to the {@link RunningJob} which can be used to track
538   * the running-job.
539   * 
540   * @param conf the job configuration.
541   * @return a handle to the {@link RunningJob} which can be used to track the
542   *         running-job.
543   * @throws FileNotFoundException
544   * @throws IOException
545   */
546  public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
547                                                  IOException {
548    return submitJobInternal(conf);
549  }
550
551  @InterfaceAudience.Private
552  public RunningJob submitJobInternal(final JobConf conf)
553      throws FileNotFoundException, IOException {
554    try {
555      conf.setBooleanIfUnset("mapred.mapper.new-api", false);
556      conf.setBooleanIfUnset("mapred.reducer.new-api", false);
557      Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
558        @Override
559        public Job run() throws IOException, ClassNotFoundException, 
560          InterruptedException {
561          Job job = Job.getInstance(conf);
562          job.submit();
563          return job;
564        }
565      });
566
567      Cluster prev = cluster;
568      // update our Cluster instance with the one created by Job for submission
569      // (we can't pass our Cluster instance to Job, since Job wraps the config
570      // instance, and the two configs would then diverge)
571      cluster = job.getCluster();
572
573      // It is important to close the previous cluster instance
574      // to cleanup resources.
575      if (prev != null) {
576        prev.close();
577      }
578      return new NetworkedJob(job);
579    } catch (InterruptedException ie) {
580      throw new IOException("interrupted", ie);
581    }
582  }
583
584  private Job getJobUsingCluster(final JobID jobid) throws IOException,
585  InterruptedException {
586    return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
587      public Job run() throws IOException, InterruptedException  {
588       return cluster.getJob(jobid);
589      }
590    });
591  }
592  /**
593   * Get an {@link RunningJob} object to track an ongoing job.  Returns
594   * null if the id does not correspond to any known job.
595   * 
596   * @param jobid the jobid of the job.
597   * @return the {@link RunningJob} handle to track the job, null if the 
598   *         <code>jobid</code> doesn't correspond to any known job.
599   * @throws IOException
600   */
601  public RunningJob getJob(final JobID jobid) throws IOException {
602    try {
603      
604      Job job = getJobUsingCluster(jobid);
605      if (job != null) {
606        JobStatus status = JobStatus.downgrade(job.getStatus());
607        if (status != null) {
608          return new NetworkedJob(status, cluster,
609              new JobConf(job.getConfiguration()));
610        } 
611      }
612    } catch (InterruptedException ie) {
613      throw new IOException(ie);
614    }
615    return null;
616  }
617
618  /**@deprecated Applications should rather use {@link #getJob(JobID)}. 
619   */
620  @Deprecated
621  public RunningJob getJob(String jobid) throws IOException {
622    return getJob(JobID.forName(jobid));
623  }
624  
625  private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
626  
627  /**
628   * Get the information of the current state of the map tasks of a job.
629   * 
630   * @param jobId the job to query.
631   * @return the list of all of the map tips.
632   * @throws IOException
633   */
634  public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
635    return getTaskReports(jobId, TaskType.MAP);
636  }
637  
638  private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 
639    IOException {
640    try {
641      Job j = getJobUsingCluster(jobId);
642      if(j == null) {
643        return EMPTY_TASK_REPORTS;
644      }
645      return TaskReport.downgradeArray(j.getTaskReports(type));
646    } catch (InterruptedException ie) {
647      throw new IOException(ie);
648    }
649  }
650  
651  /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
652  @Deprecated
653  public TaskReport[] getMapTaskReports(String jobId) throws IOException {
654    return getMapTaskReports(JobID.forName(jobId));
655  }
656  
657  /**
658   * Get the information of the current state of the reduce tasks of a job.
659   * 
660   * @param jobId the job to query.
661   * @return the list of all of the reduce tips.
662   * @throws IOException
663   */    
664  public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
665    return getTaskReports(jobId, TaskType.REDUCE);
666  }
667
668  /**
669   * Get the information of the current state of the cleanup tasks of a job.
670   * 
671   * @param jobId the job to query.
672   * @return the list of all of the cleanup tips.
673   * @throws IOException
674   */    
675  public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
676    return getTaskReports(jobId, TaskType.JOB_CLEANUP);
677  }
678
679  /**
680   * Get the information of the current state of the setup tasks of a job.
681   * 
682   * @param jobId the job to query.
683   * @return the list of all of the setup tips.
684   * @throws IOException
685   */    
686  public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
687    return getTaskReports(jobId, TaskType.JOB_SETUP);
688  }
689
690  
691  /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
692  @Deprecated
693  public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
694    return getReduceTaskReports(JobID.forName(jobId));
695  }
696  
697  /**
698   * Display the information about a job's tasks, of a particular type and
699   * in a particular state
700   * 
701   * @param jobId the ID of the job
702   * @param type the type of the task (map/reduce/setup/cleanup)
703   * @param state the state of the task 
704   * (pending/running/completed/failed/killed)
705   */
706  public void displayTasks(final JobID jobId, String type, String state) 
707  throws IOException {
708    try {
709      Job job = getJobUsingCluster(jobId);
710      super.displayTasks(job, type, state);
711    } catch (InterruptedException ie) {
712      throw new IOException(ie);
713    }
714  }
715  
716  /**
717   * Get status information about the Map-Reduce cluster.
718   *  
719   * @return the status information about the Map-Reduce cluster as an object
720   *         of {@link ClusterStatus}.
721   * @throws IOException
722   */
723  public ClusterStatus getClusterStatus() throws IOException {
724    try {
725      return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
726        public ClusterStatus run() throws IOException, InterruptedException {
727          ClusterMetrics metrics = cluster.getClusterStatus();
728          return new ClusterStatus(metrics.getTaskTrackerCount(), metrics
729            .getBlackListedTaskTrackerCount(), cluster
730            .getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
731            metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
732            metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(),
733            metrics.getDecommissionedTaskTrackerCount(), metrics
734              .getGrayListedTaskTrackerCount());
735        }
736      });
737    } catch (InterruptedException ie) {
738      throw new IOException(ie);
739    }
740  }
741
742  private  Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
743    Collection<String> list = new ArrayList<String>();
744    for (TaskTrackerInfo info: objs) {
745      list.add(info.getTaskTrackerName());
746    }
747    return list;
748  }
749
750  private  Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
751    Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
752    for (TaskTrackerInfo info: objs) {
753      BlackListInfo binfo = new BlackListInfo();
754      binfo.setTrackerName(info.getTaskTrackerName());
755      binfo.setReasonForBlackListing(info.getReasonForBlacklist());
756      binfo.setBlackListReport(info.getBlacklistReport());
757      list.add(binfo);
758    }
759    return list;
760  }
761
762  /**
763   * Get status information about the Map-Reduce cluster.
764   *  
765   * @param  detailed if true then get a detailed status including the
766   *         tracker names
767   * @return the status information about the Map-Reduce cluster as an object
768   *         of {@link ClusterStatus}.
769   * @throws IOException
770   */
771  public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
772    try {
773      return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
774        public ClusterStatus run() throws IOException, InterruptedException {
775        ClusterMetrics metrics = cluster.getClusterStatus();
776        return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
777          arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
778          cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
779          metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
780          metrics.getReduceSlotCapacity(), 
781          cluster.getJobTrackerStatus());
782        }
783      });
784    } catch (InterruptedException ie) {
785      throw new IOException(ie);
786    }
787  }
788    
789
790  /** 
791   * Get the jobs that are not completed and not failed.
792   * 
793   * @return array of {@link JobStatus} for the running/to-be-run jobs.
794   * @throws IOException
795   */
796  public JobStatus[] jobsToComplete() throws IOException {
797    List<JobStatus> stats = new ArrayList<JobStatus>();
798    for (JobStatus stat : getAllJobs()) {
799      if (!stat.isJobComplete()) {
800        stats.add(stat);
801      }
802    }
803    return stats.toArray(new JobStatus[0]);
804  }
805
806  /** 
807   * Get the jobs that are submitted.
808   * 
809   * @return array of {@link JobStatus} for the submitted jobs.
810   * @throws IOException
811   */
812  public JobStatus[] getAllJobs() throws IOException {
813    try {
814      org.apache.hadoop.mapreduce.JobStatus[] jobs = 
815          clientUgi.doAs(new PrivilegedExceptionAction<
816              org.apache.hadoop.mapreduce.JobStatus[]> () {
817            public org.apache.hadoop.mapreduce.JobStatus[] run() 
818                throws IOException, InterruptedException {
819              return cluster.getAllJobStatuses();
820            }
821          });
822      JobStatus[] stats = new JobStatus[jobs.length];
823      for (int i = 0; i < jobs.length; i++) {
824        stats[i] = JobStatus.downgrade(jobs[i]);
825      }
826      return stats;
827    } catch (InterruptedException ie) {
828      throw new IOException(ie);
829    }
830  }
831  
832  /** 
833   * Utility that submits a job, then polls for progress until the job is
834   * complete.
835   * 
836   * @param job the job configuration.
837   * @throws IOException if the job fails
838   */
839  public static RunningJob runJob(JobConf job) throws IOException {
840    JobClient jc = new JobClient(job);
841    RunningJob rj = jc.submitJob(job);
842    try {
843      if (!jc.monitorAndPrintJob(job, rj)) {
844        throw new IOException("Job failed!");
845      }
846    } catch (InterruptedException ie) {
847      Thread.currentThread().interrupt();
848    }
849    return rj;
850  }
851  
852  /**
853   * Monitor a job and print status in real-time as progress is made and tasks 
854   * fail.
855   * @param conf the job's configuration
856   * @param job the job to track
857   * @return true if the job succeeded
858   * @throws IOException if communication to the JobTracker fails
859   */
860  public boolean monitorAndPrintJob(JobConf conf, 
861                                    RunningJob job
862  ) throws IOException, InterruptedException {
863    return ((NetworkedJob)job).monitorAndPrintJob();
864  }
865
866  static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
867    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
868  }
869  
870  static Configuration getConfiguration(String jobTrackerSpec)
871  {
872    Configuration conf = new Configuration();
873    if (jobTrackerSpec != null) {        
874      if (jobTrackerSpec.indexOf(":") >= 0) {
875        conf.set("mapred.job.tracker", jobTrackerSpec);
876      } else {
877        String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
878        URL validate = conf.getResource(classpathFile);
879        if (validate == null) {
880          throw new RuntimeException(classpathFile + " not found on CLASSPATH");
881        }
882        conf.addResource(classpathFile);
883      }
884    }
885    return conf;
886  }
887
888  /**
889   * Sets the output filter for tasks. only those tasks are printed whose
890   * output matches the filter. 
891   * @param newValue task filter.
892   */
893  @Deprecated
894  public void setTaskOutputFilter(TaskStatusFilter newValue){
895    this.taskOutputFilter = newValue;
896  }
897    
898  /**
899   * Get the task output filter out of the JobConf.
900   * 
901   * @param job the JobConf to examine.
902   * @return the filter level.
903   */
904  public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
905    return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
906                                            "FAILED"));
907  }
908    
909  /**
910   * Modify the JobConf to set the task output filter.
911   * 
912   * @param job the JobConf to modify.
913   * @param newValue the value to set.
914   */
915  public static void setTaskOutputFilter(JobConf job, 
916                                         TaskStatusFilter newValue) {
917    job.set("jobclient.output.filter", newValue.toString());
918  }
919    
920  /**
921   * Returns task output filter.
922   * @return task filter. 
923   */
924  @Deprecated
925  public TaskStatusFilter getTaskOutputFilter(){
926    return this.taskOutputFilter; 
927  }
928
929  protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
930      String counterGroupName, String counterName) throws IOException {
931    Counters counters = Counters.downgrade(cntrs);
932    return counters.findCounter(counterGroupName, counterName).getValue();
933  }
934
935  /**
936   * Get status information about the max available Maps in the cluster.
937   *  
938   * @return the max available Maps in the cluster
939   * @throws IOException
940   */
941  public int getDefaultMaps() throws IOException {
942    try {
943      return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
944        @Override
945        public Integer run() throws IOException, InterruptedException {
946          return cluster.getClusterStatus().getMapSlotCapacity();
947        }
948      });
949    } catch (InterruptedException ie) {
950      throw new IOException(ie);
951    }
952  }
953
954  /**
955   * Get status information about the max available Reduces in the cluster.
956   *  
957   * @return the max available Reduces in the cluster
958   * @throws IOException
959   */
960  public int getDefaultReduces() throws IOException {
961    try {
962      return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
963        @Override
964        public Integer run() throws IOException, InterruptedException {
965          return cluster.getClusterStatus().getReduceSlotCapacity();
966        }
967      });
968    } catch (InterruptedException ie) {
969      throw new IOException(ie);
970    }
971  }
972
973  /**
974   * Grab the jobtracker system directory path where job-specific files are to be placed.
975   * 
976   * @return the system directory where job-specific files are to be placed.
977   */
978  public Path getSystemDir() {
979    try {
980      return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
981        @Override
982        public Path run() throws IOException, InterruptedException {
983          return cluster.getSystemDir();
984        }
985      });
986      } catch (IOException ioe) {
987      return null;
988    } catch (InterruptedException ie) {
989      return null;
990    }
991  }
992
993  /**
994   * Checks if the job directory is clean and has all the required components
995   * for (re) starting the job
996   */
997  public static boolean isJobDirValid(Path jobDirPath, FileSystem fs)
998      throws IOException {
999    FileStatus[] contents = fs.listStatus(jobDirPath);
1000    int matchCount = 0;
1001    if (contents != null && contents.length >= 2) {
1002      for (FileStatus status : contents) {
1003        if ("job.xml".equals(status.getPath().getName())) {
1004          ++matchCount;
1005        }
1006        if ("job.split".equals(status.getPath().getName())) {
1007          ++matchCount;
1008        }
1009      }
1010      if (matchCount == 2) {
1011        return true;
1012      }
1013    }
1014    return false;
1015  }
1016
1017  /**
1018   * Fetch the staging area directory for the application
1019   * 
1020   * @return path to staging area directory
1021   * @throws IOException
1022   */
1023  public Path getStagingAreaDir() throws IOException {
1024    try {
1025      return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
1026        @Override
1027        public Path run() throws IOException, InterruptedException {
1028          return cluster.getStagingAreaDir();
1029        }
1030      });
1031    } catch (InterruptedException ie) {
1032      // throw RuntimeException instead for compatibility reasons
1033      throw new RuntimeException(ie);
1034    }
1035  }
1036
1037  private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
1038    JobQueueInfo ret = new JobQueueInfo(queue);
1039    // make sure to convert any children
1040    if (queue.getQueueChildren().size() > 0) {
1041      List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
1042          .getQueueChildren().size());
1043      for (QueueInfo child : queue.getQueueChildren()) {
1044        childQueues.add(getJobQueueInfo(child));
1045      }
1046      ret.setChildren(childQueues);
1047    }
1048    return ret;
1049  }
1050
1051  private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
1052      throws IOException {
1053    JobQueueInfo[] ret = new JobQueueInfo[queues.length];
1054    for (int i = 0; i < queues.length; i++) {
1055      ret[i] = getJobQueueInfo(queues[i]);
1056    }
1057    return ret;
1058  }
1059
1060  /**
1061   * Returns an array of queue information objects about root level queues
1062   * configured
1063   *
1064   * @return the array of root level JobQueueInfo objects
1065   * @throws IOException
1066   */
1067  public JobQueueInfo[] getRootQueues() throws IOException {
1068    try {
1069      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1070        public JobQueueInfo[] run() throws IOException, InterruptedException {
1071          return getJobQueueInfoArray(cluster.getRootQueues());
1072        }
1073      });
1074    } catch (InterruptedException ie) {
1075      throw new IOException(ie);
1076    }
1077  }
1078
1079  /**
1080   * Returns an array of queue information objects about immediate children
1081   * of queue queueName.
1082   * 
1083   * @param queueName
1084   * @return the array of immediate children JobQueueInfo objects
1085   * @throws IOException
1086   */
1087  public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
1088    try {
1089      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1090        public JobQueueInfo[] run() throws IOException, InterruptedException {
1091          return getJobQueueInfoArray(cluster.getChildQueues(queueName));
1092        }
1093      });
1094    } catch (InterruptedException ie) {
1095      throw new IOException(ie);
1096    }
1097  }
1098  
1099  /**
1100   * Return an array of queue information objects about all the Job Queues
1101   * configured.
1102   * 
1103   * @return Array of JobQueueInfo objects
1104   * @throws IOException
1105   */
1106  public JobQueueInfo[] getQueues() throws IOException {
1107    try {
1108      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1109        public JobQueueInfo[] run() throws IOException, InterruptedException {
1110          return getJobQueueInfoArray(cluster.getQueues());
1111        }
1112      });
1113    } catch (InterruptedException ie) {
1114      throw new IOException(ie);
1115    }
1116  }
1117  
1118  /**
1119   * Gets all the jobs which were added to particular Job Queue
1120   * 
1121   * @param queueName name of the Job Queue
1122   * @return Array of jobs present in the job queue
1123   * @throws IOException
1124   */
1125  
1126  public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
1127    try {
1128      QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
1129        @Override
1130        public QueueInfo run() throws IOException, InterruptedException {
1131          return cluster.getQueue(queueName);
1132        }
1133      });
1134      if (queue == null) {
1135        return null;
1136      }
1137      org.apache.hadoop.mapreduce.JobStatus[] stats = 
1138        queue.getJobStatuses();
1139      JobStatus[] ret = new JobStatus[stats.length];
1140      for (int i = 0 ; i < stats.length; i++ ) {
1141        ret[i] = JobStatus.downgrade(stats[i]);
1142      }
1143      return ret;
1144    } catch (InterruptedException ie) {
1145      throw new IOException(ie);
1146    }
1147  }
1148  
1149  /**
1150   * Gets the queue information associated to a particular Job Queue
1151   * 
1152   * @param queueName name of the job queue.
1153   * @return Queue information associated to particular queue.
1154   * @throws IOException
1155   */
1156  public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
1157    try {
1158      QueueInfo queueInfo = clientUgi.doAs(new 
1159          PrivilegedExceptionAction<QueueInfo>() {
1160        public QueueInfo run() throws IOException, InterruptedException {
1161          return cluster.getQueue(queueName);
1162        }
1163      });
1164      if (queueInfo != null) {
1165        return new JobQueueInfo(queueInfo);
1166      }
1167      return null;
1168    } catch (InterruptedException ie) {
1169      throw new IOException(ie);
1170    }
1171  }
1172  
1173  /**
1174   * Gets the Queue ACLs for current user
1175   * @return array of QueueAclsInfo object for current user.
1176   * @throws IOException
1177   */
1178  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
1179    try {
1180      org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 
1181        clientUgi.doAs(new 
1182            PrivilegedExceptionAction
1183            <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
1184              public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 
1185              throws IOException, InterruptedException {
1186                return cluster.getQueueAclsForCurrentUser();
1187              }
1188        });
1189      QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
1190      for (int i = 0 ; i < acls.length; i++ ) {
1191        ret[i] = QueueAclsInfo.downgrade(acls[i]);
1192      }
1193      return ret;
1194    } catch (InterruptedException ie) {
1195      throw new IOException(ie);
1196    }
1197  }
1198
1199  /**
1200   * Get a delegation token for the user from the JobTracker.
1201   * @param renewer the user who can renew the token
1202   * @return the new token
1203   * @throws IOException
1204   */
1205  public Token<DelegationTokenIdentifier> 
1206    getDelegationToken(final Text renewer) throws IOException, InterruptedException {
1207    return clientUgi.doAs(new 
1208        PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
1209      public Token<DelegationTokenIdentifier> run() throws IOException, 
1210      InterruptedException {
1211        return cluster.getDelegationToken(renewer);
1212      }
1213    });
1214  }
1215
1216  /**
1217   * Renew a delegation token
1218   * @param token the token to renew
1219   * @return true if the renewal went well
1220   * @throws InvalidToken
1221   * @throws IOException
1222   * @deprecated Use {@link Token#renew} instead
1223   */
1224  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
1225                                   ) throws InvalidToken, IOException, 
1226                                            InterruptedException {
1227    return token.renew(getConf());
1228  }
1229
1230  /**
1231   * Cancel a delegation token from the JobTracker
1232   * @param token the token to cancel
1233   * @throws IOException
1234   * @deprecated Use {@link Token#cancel} instead
1235   */
1236  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
1237                                    ) throws InvalidToken, IOException, 
1238                                             InterruptedException {
1239    token.cancel(getConf());
1240  }
1241
1242  /**
1243   */
1244  public static void main(String argv[]) throws Exception {
1245    int res = ToolRunner.run(new JobClient(), argv);
1246    System.exit(res);
1247  }
1248}
1249