001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapred;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.net.InetSocketAddress;
023import java.net.URL;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.List;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.Text;
035import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
036import org.apache.hadoop.mapreduce.Cluster;
037import org.apache.hadoop.mapreduce.ClusterMetrics;
038import org.apache.hadoop.mapreduce.Job;
039import org.apache.hadoop.mapreduce.QueueInfo;
040import org.apache.hadoop.mapreduce.TaskTrackerInfo;
041import org.apache.hadoop.mapreduce.TaskType;
042import org.apache.hadoop.mapreduce.filecache.DistributedCache;
043import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
044import org.apache.hadoop.mapreduce.tools.CLI;
045import org.apache.hadoop.mapreduce.util.ConfigUtil;
046import org.apache.hadoop.security.UserGroupInformation;
047import org.apache.hadoop.security.token.SecretManager.InvalidToken;
048import org.apache.hadoop.security.token.Token;
049import org.apache.hadoop.security.token.TokenRenewer;
050import org.apache.hadoop.util.Tool;
051import org.apache.hadoop.util.ToolRunner;
052
053/**
054 * <code>JobClient</code> is the primary interface for the user-job to interact
055 * with the cluster.
056 * 
057 * <code>JobClient</code> provides facilities to submit jobs, track their 
058 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
059 * status information etc.
060 * 
061 * <p>The job submission process involves:
062 * <ol>
063 *   <li>
064 *   Checking the input and output specifications of the job.
065 *   </li>
066 *   <li>
067 *   Computing the {@link InputSplit}s for the job.
068 *   </li>
069 *   <li>
070 *   Setup the requisite accounting information for the {@link DistributedCache} 
071 *   of the job, if necessary.
072 *   </li>
073 *   <li>
074 *   Copying the job's jar and configuration to the map-reduce system directory 
075 *   on the distributed file-system. 
076 *   </li>
077 *   <li>
078 *   Submitting the job to the cluster and optionally monitoring
079 *   it's status.
080 *   </li>
081 * </ol></p>
082 *  
083 * Normally the user creates the application, describes various facets of the
084 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 
085 * the job and monitor its progress.
086 * 
087 * <p>Here is an example on how to use <code>JobClient</code>:</p>
088 * <p><blockquote><pre>
089 *     // Create a new JobConf
090 *     JobConf job = new JobConf(new Configuration(), MyJob.class);
091 *     
092 *     // Specify various job-specific parameters     
093 *     job.setJobName("myjob");
094 *     
095 *     job.setInputPath(new Path("in"));
096 *     job.setOutputPath(new Path("out"));
097 *     
098 *     job.setMapperClass(MyJob.MyMapper.class);
099 *     job.setReducerClass(MyJob.MyReducer.class);
100 *
101 *     // Submit the job, then poll for progress until the job is complete
102 *     JobClient.runJob(job);
103 * </pre></blockquote></p>
104 * 
105 * <h4 id="JobControl">Job Control</h4>
106 * 
107 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 
108 * which cannot be done via a single map-reduce job. This is fairly easy since 
109 * the output of the job, typically, goes to distributed file-system and that 
110 * can be used as the input for the next job.</p>
111 * 
112 * <p>However, this also means that the onus on ensuring jobs are complete 
113 * (success/failure) lies squarely on the clients. In such situations the 
114 * various job-control options are:
115 * <ol>
116 *   <li>
117 *   {@link #runJob(JobConf)} : submits the job and returns only after 
118 *   the job has completed.
119 *   </li>
120 *   <li>
121 *   {@link #submitJob(JobConf)} : only submits the job, then poll the 
122 *   returned handle to the {@link RunningJob} to query status and make 
123 *   scheduling decisions.
124 *   </li>
125 *   <li>
126 *   {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
127 *   on job-completion, thus avoiding polling.
128 *   </li>
129 * </ol></p>
130 * 
131 * @see JobConf
132 * @see ClusterStatus
133 * @see Tool
134 * @see DistributedCache
135 */
136@InterfaceAudience.Public
137@InterfaceStability.Stable
138public class JobClient extends CLI {
139  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
140  private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
141  
142  static{
143    ConfigUtil.loadResources();
144  }
145
146  /**
147   * A NetworkedJob is an implementation of RunningJob.  It holds
148   * a JobProfile object to provide some info, and interacts with the
149   * remote service to provide certain functionality.
150   */
151  static class NetworkedJob implements RunningJob {
152    Job job;
153    /**
154     * We store a JobProfile and a timestamp for when we last
155     * acquired the job profile.  If the job is null, then we cannot
156     * perform any of the tasks.  The job might be null if the cluster
157     * has completely forgotten about the job.  (eg, 24 hours after the
158     * job completes.)
159     */
160    public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
161      job = Job.getInstance(cluster, status, new JobConf(status.getJobFile()));
162    }
163
164    public NetworkedJob(Job job) throws IOException {
165      this.job = job;
166    }
167
168    public Configuration getConfiguration() {
169      return job.getConfiguration();
170    }
171
172    /**
173     * An identifier for the job
174     */
175    public JobID getID() {
176      return JobID.downgrade(job.getJobID());
177    }
178    
179    /** @deprecated This method is deprecated and will be removed. Applications should 
180     * rather use {@link #getID()}.*/
181    @Deprecated
182    public String getJobID() {
183      return getID().toString();
184    }
185    
186    /**
187     * The user-specified job name
188     */
189    public String getJobName() {
190      return job.getJobName();
191    }
192
193    /**
194     * The name of the job file
195     */
196    public String getJobFile() {
197      return job.getJobFile();
198    }
199
200    /**
201     * A URL where the job's status can be seen
202     */
203    public String getTrackingURL() {
204      return job.getTrackingURL();
205    }
206
207    /**
208     * A float between 0.0 and 1.0, indicating the % of map work
209     * completed.
210     */
211    public float mapProgress() throws IOException {
212      try {
213        return job.mapProgress();
214      } catch (InterruptedException ie) {
215        throw new IOException(ie);
216      }
217    }
218
219    /**
220     * A float between 0.0 and 1.0, indicating the % of reduce work
221     * completed.
222     */
223    public float reduceProgress() throws IOException {
224      try {
225        return job.reduceProgress();
226      } catch (InterruptedException ie) {
227        throw new IOException(ie);
228      }
229    }
230
231    /**
232     * A float between 0.0 and 1.0, indicating the % of cleanup work
233     * completed.
234     */
235    public float cleanupProgress() throws IOException {
236      try {
237        return job.cleanupProgress();
238      } catch (InterruptedException ie) {
239        throw new IOException(ie);
240      }
241    }
242
243    /**
244     * A float between 0.0 and 1.0, indicating the % of setup work
245     * completed.
246     */
247    public float setupProgress() throws IOException {
248      try {
249        return job.setupProgress();
250      } catch (InterruptedException ie) {
251        throw new IOException(ie);
252      }
253    }
254
255    /**
256     * Returns immediately whether the whole job is done yet or not.
257     */
258    public synchronized boolean isComplete() throws IOException {
259      try {
260        return job.isComplete();
261      } catch (InterruptedException ie) {
262        throw new IOException(ie);
263      }
264    }
265
266    /**
267     * True iff job completed successfully.
268     */
269    public synchronized boolean isSuccessful() throws IOException {
270      try {
271        return job.isSuccessful();
272      } catch (InterruptedException ie) {
273        throw new IOException(ie);
274      }
275    }
276
277    /**
278     * Blocks until the job is finished
279     */
280    public void waitForCompletion() throws IOException {
281      try {
282        job.waitForCompletion(false);
283      } catch (InterruptedException ie) {
284        throw new IOException(ie);
285      } catch (ClassNotFoundException ce) {
286        throw new IOException(ce);
287      }
288    }
289
290    /**
291     * Tells the service to get the state of the current job.
292     */
293    public synchronized int getJobState() throws IOException {
294      try {
295        return job.getJobState().getValue();
296      } catch (InterruptedException ie) {
297        throw new IOException(ie);
298      }
299    }
300    
301    /**
302     * Tells the service to terminate the current job.
303     */
304    public synchronized void killJob() throws IOException {
305      try {
306        job.killJob();
307      } catch (InterruptedException ie) {
308        throw new IOException(ie);
309      }
310    }
311   
312    
313    /** Set the priority of the job.
314    * @param priority new priority of the job. 
315    */
316    public synchronized void setJobPriority(String priority) 
317                                                throws IOException {
318      try {
319        job.setPriority(
320          org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
321      } catch (InterruptedException ie) {
322        throw new IOException(ie);
323      }
324    }
325    
326    /**
327     * Kill indicated task attempt.
328     * @param taskId the id of the task to kill.
329     * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
330     * it is just killed, w/o affecting job failure status.
331     */
332    public synchronized void killTask(TaskAttemptID taskId,
333        boolean shouldFail) throws IOException {
334      try {
335        if (shouldFail) {
336          job.failTask(taskId);
337        } else {
338          job.killTask(taskId);
339        }
340      } catch (InterruptedException ie) {
341        throw new IOException(ie);
342      }
343    }
344
345    /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
346    @Deprecated
347    public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
348      killTask(TaskAttemptID.forName(taskId), shouldFail);
349    }
350    
351    /**
352     * Fetch task completion events from cluster for this job. 
353     */
354    public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
355        int startFrom) throws IOException {
356      try {
357        org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 
358          job.getTaskCompletionEvents(startFrom, 10);
359        TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
360        for (int i = 0 ; i < acls.length; i++ ) {
361          ret[i] = TaskCompletionEvent.downgrade(acls[i]);
362        }
363        return ret;
364      } catch (InterruptedException ie) {
365        throw new IOException(ie);
366      }
367    }
368
369    /**
370     * Dump stats to screen
371     */
372    @Override
373    public String toString() {
374      return job.toString();
375    }
376        
377    /**
378     * Returns the counters for this job
379     */
380    public Counters getCounters() throws IOException {
381      try { 
382        Counters result = null;
383        org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
384        if(temp != null) {
385          result = Counters.downgrade(temp);
386        }
387        return result;
388      } catch (InterruptedException ie) {
389        throw new IOException(ie);
390      }
391    }
392    
393    @Override
394    public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
395      try { 
396        return job.getTaskDiagnostics(id);
397      } catch (InterruptedException ie) {
398        throw new IOException(ie);
399      }
400    }
401
402    public String getHistoryUrl() throws IOException {
403      try {
404        return job.getHistoryUrl();
405      } catch (InterruptedException ie) {
406        throw new IOException(ie);
407      }
408    }
409
410    public boolean isRetired() throws IOException {
411      try {
412        return job.isRetired();
413      } catch (InterruptedException ie) {
414        throw new IOException(ie);
415      }
416    }
417    
418    boolean monitorAndPrintJob() throws IOException, InterruptedException {
419      return job.monitorAndPrintJob();
420    }
421    
422    @Override
423    public String getFailureInfo() throws IOException {
424      try {
425        return job.getStatus().getFailureInfo();
426      } catch (InterruptedException ie) {
427        throw new IOException(ie);
428      }
429    }
430
431    @Override
432    public JobStatus getJobStatus() throws IOException {
433      try {
434        return JobStatus.downgrade(job.getStatus());
435      } catch (InterruptedException ie) {
436        throw new IOException(ie);
437      }
438    }
439  }
440
441  /**
442   * Ugi of the client. We store this ugi when the client is created and 
443   * then make sure that the same ugi is used to run the various protocols.
444   */
445  UserGroupInformation clientUgi;
446  
447  /**
448   * Create a job client.
449   */
450  public JobClient() {
451  }
452    
453  /**
454   * Build a job client with the given {@link JobConf}, and connect to the 
455   * default cluster
456   * 
457   * @param conf the job configuration.
458   * @throws IOException
459   */
460  public JobClient(JobConf conf) throws IOException {
461    init(conf);
462  }
463
464  /**
465   * Build a job client with the given {@link Configuration}, 
466   * and connect to the default cluster
467   * 
468   * @param conf the configuration.
469   * @throws IOException
470   */
471  public JobClient(Configuration conf) throws IOException {
472    init(new JobConf(conf));
473  }
474
475  /**
476   * Connect to the default cluster
477   * @param conf the job configuration.
478   * @throws IOException
479   */
480  public void init(JobConf conf) throws IOException {
481    setConf(conf);
482    cluster = new Cluster(conf);
483    clientUgi = UserGroupInformation.getCurrentUser();
484  }
485
486  /**
487   * Build a job client, connect to the indicated job tracker.
488   * 
489   * @param jobTrackAddr the job tracker to connect to.
490   * @param conf configuration.
491   */
492  public JobClient(InetSocketAddress jobTrackAddr, 
493                   Configuration conf) throws IOException {
494    cluster = new Cluster(jobTrackAddr, conf);
495    clientUgi = UserGroupInformation.getCurrentUser();
496  }
497
498  /**
499   * Close the <code>JobClient</code>.
500   */
501  public synchronized void close() throws IOException {
502    cluster.close();
503  }
504
505  /**
506   * Get a filesystem handle.  We need this to prepare jobs
507   * for submission to the MapReduce system.
508   * 
509   * @return the filesystem handle.
510   */
511  public synchronized FileSystem getFs() throws IOException {
512    try { 
513      return cluster.getFileSystem();
514    } catch (InterruptedException ie) {
515      throw new IOException(ie);
516    }
517  }
518  
519  /**
520   * Get a handle to the Cluster
521   */
522  public Cluster getClusterHandle() {
523    return cluster;
524  }
525  
526  /**
527   * Submit a job to the MR system.
528   * 
529   * This returns a handle to the {@link RunningJob} which can be used to track
530   * the running-job.
531   * 
532   * @param jobFile the job configuration.
533   * @return a handle to the {@link RunningJob} which can be used to track the
534   *         running-job.
535   * @throws FileNotFoundException
536   * @throws InvalidJobConfException
537   * @throws IOException
538   */
539  public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
540                                                     InvalidJobConfException, 
541                                                     IOException {
542    // Load in the submitted job details
543    JobConf job = new JobConf(jobFile);
544    return submitJob(job);
545  }
546    
547  /**
548   * Submit a job to the MR system.
549   * This returns a handle to the {@link RunningJob} which can be used to track
550   * the running-job.
551   * 
552   * @param conf the job configuration.
553   * @return a handle to the {@link RunningJob} which can be used to track the
554   *         running-job.
555   * @throws FileNotFoundException
556   * @throws IOException
557   */
558  public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
559                                                  IOException {
560    try {
561      conf.setBooleanIfUnset("mapred.mapper.new-api", false);
562      conf.setBooleanIfUnset("mapred.reducer.new-api", false);
563      Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
564        @Override
565        public Job run() throws IOException, ClassNotFoundException, 
566          InterruptedException {
567          Job job = Job.getInstance(conf);
568          job.submit();
569          return job;
570        }
571      });
572      // update our Cluster instance with the one created by Job for submission
573      // (we can't pass our Cluster instance to Job, since Job wraps the config
574      // instance, and the two configs would then diverge)
575      cluster = job.getCluster();
576      return new NetworkedJob(job);
577    } catch (InterruptedException ie) {
578      throw new IOException("interrupted", ie);
579    }
580  }
581
582  private Job getJobUsingCluster(final JobID jobid) throws IOException,
583  InterruptedException {
584    return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
585      public Job run() throws IOException, InterruptedException  {
586       return cluster.getJob(jobid);
587      }
588    });
589  }
590  /**
591   * Get an {@link RunningJob} object to track an ongoing job.  Returns
592   * null if the id does not correspond to any known job.
593   * 
594   * @param jobid the jobid of the job.
595   * @return the {@link RunningJob} handle to track the job, null if the 
596   *         <code>jobid</code> doesn't correspond to any known job.
597   * @throws IOException
598   */
599  public RunningJob getJob(final JobID jobid) throws IOException {
600    try {
601      
602      Job job = getJobUsingCluster(jobid);
603      if (job != null) {
604        JobStatus status = JobStatus.downgrade(job.getStatus());
605        if (status != null) {
606          return new NetworkedJob(status, cluster);
607        } 
608      }
609    } catch (InterruptedException ie) {
610      throw new IOException(ie);
611    }
612    return null;
613  }
614
615  /**@deprecated Applications should rather use {@link #getJob(JobID)}. 
616   */
617  @Deprecated
618  public RunningJob getJob(String jobid) throws IOException {
619    return getJob(JobID.forName(jobid));
620  }
621  
622  private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
623  
624  /**
625   * Get the information of the current state of the map tasks of a job.
626   * 
627   * @param jobId the job to query.
628   * @return the list of all of the map tips.
629   * @throws IOException
630   */
631  public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
632    return getTaskReports(jobId, TaskType.MAP);
633  }
634  
635  private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 
636    IOException {
637    try {
638      Job j = getJobUsingCluster(jobId);
639      if(j == null) {
640        return EMPTY_TASK_REPORTS;
641      }
642      return TaskReport.downgradeArray(j.getTaskReports(type));
643    } catch (InterruptedException ie) {
644      throw new IOException(ie);
645    }
646  }
647  
648  /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
649  @Deprecated
650  public TaskReport[] getMapTaskReports(String jobId) throws IOException {
651    return getMapTaskReports(JobID.forName(jobId));
652  }
653  
654  /**
655   * Get the information of the current state of the reduce tasks of a job.
656   * 
657   * @param jobId the job to query.
658   * @return the list of all of the reduce tips.
659   * @throws IOException
660   */    
661  public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
662    return getTaskReports(jobId, TaskType.REDUCE);
663  }
664
665  /**
666   * Get the information of the current state of the cleanup tasks of a job.
667   * 
668   * @param jobId the job to query.
669   * @return the list of all of the cleanup tips.
670   * @throws IOException
671   */    
672  public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
673    return getTaskReports(jobId, TaskType.JOB_CLEANUP);
674  }
675
676  /**
677   * Get the information of the current state of the setup tasks of a job.
678   * 
679   * @param jobId the job to query.
680   * @return the list of all of the setup tips.
681   * @throws IOException
682   */    
683  public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
684    return getTaskReports(jobId, TaskType.JOB_SETUP);
685  }
686
687  
688  /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
689  @Deprecated
690  public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
691    return getReduceTaskReports(JobID.forName(jobId));
692  }
693  
694  /**
695   * Display the information about a job's tasks, of a particular type and
696   * in a particular state
697   * 
698   * @param jobId the ID of the job
699   * @param type the type of the task (map/reduce/setup/cleanup)
700   * @param state the state of the task 
701   * (pending/running/completed/failed/killed)
702   */
703  public void displayTasks(final JobID jobId, String type, String state) 
704  throws IOException {
705    try {
706      Job job = getJobUsingCluster(jobId);
707      super.displayTasks(job, type, state);
708    } catch (InterruptedException ie) {
709      throw new IOException(ie);
710    }
711  }
712  
713  /**
714   * Get status information about the Map-Reduce cluster.
715   *  
716   * @return the status information about the Map-Reduce cluster as an object
717   *         of {@link ClusterStatus}.
718   * @throws IOException
719   */
720  public ClusterStatus getClusterStatus() throws IOException {
721    try {
722      return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
723        public ClusterStatus run()  throws IOException, InterruptedException {
724          ClusterMetrics metrics = cluster.getClusterStatus();
725          return new ClusterStatus(metrics.getTaskTrackerCount(),
726              metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(),
727              metrics.getOccupiedMapSlots(),
728              metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
729              metrics.getReduceSlotCapacity(),
730              cluster.getJobTrackerStatus(),
731              metrics.getDecommissionedTaskTrackerCount());
732        }
733      });
734    }
735      catch (InterruptedException ie) {
736      throw new IOException(ie);
737    }
738  }
739
740  private  Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
741    Collection<String> list = new ArrayList<String>();
742    for (TaskTrackerInfo info: objs) {
743      list.add(info.getTaskTrackerName());
744    }
745    return list;
746  }
747
748  private  Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
749    Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
750    for (TaskTrackerInfo info: objs) {
751      BlackListInfo binfo = new BlackListInfo();
752      binfo.setTrackerName(info.getTaskTrackerName());
753      binfo.setReasonForBlackListing(info.getReasonForBlacklist());
754      binfo.setBlackListReport(info.getBlacklistReport());
755      list.add(binfo);
756    }
757    return list;
758  }
759
760  /**
761   * Get status information about the Map-Reduce cluster.
762   *  
763   * @param  detailed if true then get a detailed status including the
764   *         tracker names
765   * @return the status information about the Map-Reduce cluster as an object
766   *         of {@link ClusterStatus}.
767   * @throws IOException
768   */
769  public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
770    try {
771      return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
772        public ClusterStatus run() throws IOException, InterruptedException {
773        ClusterMetrics metrics = cluster.getClusterStatus();
774        return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
775          arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
776          cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
777          metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
778          metrics.getReduceSlotCapacity(), 
779          cluster.getJobTrackerStatus());
780        }
781      });
782    } catch (InterruptedException ie) {
783      throw new IOException(ie);
784    }
785  }
786    
787
788  /** 
789   * Get the jobs that are not completed and not failed.
790   * 
791   * @return array of {@link JobStatus} for the running/to-be-run jobs.
792   * @throws IOException
793   */
794  public JobStatus[] jobsToComplete() throws IOException {
795    List<JobStatus> stats = new ArrayList<JobStatus>();
796    for (JobStatus stat : getAllJobs()) {
797      if (!stat.isJobComplete()) {
798        stats.add(stat);
799      }
800    }
801    return stats.toArray(new JobStatus[0]);
802  }
803
804  /** 
805   * Get the jobs that are submitted.
806   * 
807   * @return array of {@link JobStatus} for the submitted jobs.
808   * @throws IOException
809   */
810  public JobStatus[] getAllJobs() throws IOException {
811    try {
812      org.apache.hadoop.mapreduce.JobStatus[] jobs = 
813          clientUgi.doAs(new PrivilegedExceptionAction<
814              org.apache.hadoop.mapreduce.JobStatus[]> () {
815            public org.apache.hadoop.mapreduce.JobStatus[] run() 
816                throws IOException, InterruptedException {
817              return cluster.getAllJobStatuses();
818            }
819          });
820      JobStatus[] stats = new JobStatus[jobs.length];
821      for (int i = 0; i < jobs.length; i++) {
822        stats[i] = JobStatus.downgrade(jobs[i]);
823      }
824      return stats;
825    } catch (InterruptedException ie) {
826      throw new IOException(ie);
827    }
828  }
829  
830  /** 
831   * Utility that submits a job, then polls for progress until the job is
832   * complete.
833   * 
834   * @param job the job configuration.
835   * @throws IOException if the job fails
836   */
837  public static RunningJob runJob(JobConf job) throws IOException {
838    JobClient jc = new JobClient(job);
839    RunningJob rj = jc.submitJob(job);
840    try {
841      if (!jc.monitorAndPrintJob(job, rj)) {
842        throw new IOException("Job failed!");
843      }
844    } catch (InterruptedException ie) {
845      Thread.currentThread().interrupt();
846    }
847    return rj;
848  }
849  
850  /**
851   * Monitor a job and print status in real-time as progress is made and tasks 
852   * fail.
853   * @param conf the job's configuration
854   * @param job the job to track
855   * @return true if the job succeeded
856   * @throws IOException if communication to the JobTracker fails
857   */
858  public boolean monitorAndPrintJob(JobConf conf, 
859                                    RunningJob job
860  ) throws IOException, InterruptedException {
861    return ((NetworkedJob)job).monitorAndPrintJob();
862  }
863
864  static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
865    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
866  }
867  
868  static Configuration getConfiguration(String jobTrackerSpec)
869  {
870    Configuration conf = new Configuration();
871    if (jobTrackerSpec != null) {        
872      if (jobTrackerSpec.indexOf(":") >= 0) {
873        conf.set("mapred.job.tracker", jobTrackerSpec);
874      } else {
875        String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
876        URL validate = conf.getResource(classpathFile);
877        if (validate == null) {
878          throw new RuntimeException(classpathFile + " not found on CLASSPATH");
879        }
880        conf.addResource(classpathFile);
881      }
882    }
883    return conf;
884  }
885
886  /**
887   * Sets the output filter for tasks. only those tasks are printed whose
888   * output matches the filter. 
889   * @param newValue task filter.
890   */
891  @Deprecated
892  public void setTaskOutputFilter(TaskStatusFilter newValue){
893    this.taskOutputFilter = newValue;
894  }
895    
896  /**
897   * Get the task output filter out of the JobConf.
898   * 
899   * @param job the JobConf to examine.
900   * @return the filter level.
901   */
902  public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
903    return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
904                                            "FAILED"));
905  }
906    
907  /**
908   * Modify the JobConf to set the task output filter.
909   * 
910   * @param job the JobConf to modify.
911   * @param newValue the value to set.
912   */
913  public static void setTaskOutputFilter(JobConf job, 
914                                         TaskStatusFilter newValue) {
915    job.set("jobclient.output.filter", newValue.toString());
916  }
917    
918  /**
919   * Returns task output filter.
920   * @return task filter. 
921   */
922  @Deprecated
923  public TaskStatusFilter getTaskOutputFilter(){
924    return this.taskOutputFilter; 
925  }
926
927  protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
928      String counterGroupName, String counterName) throws IOException {
929    Counters counters = Counters.downgrade(cntrs);
930    return counters.findCounter(counterGroupName, counterName).getValue();
931  }
932
933  /**
934   * Get status information about the max available Maps in the cluster.
935   *  
936   * @return the max available Maps in the cluster
937   * @throws IOException
938   */
939  public int getDefaultMaps() throws IOException {
940    try {
941      return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
942        @Override
943        public Integer run() throws IOException, InterruptedException {
944          return cluster.getClusterStatus().getMapSlotCapacity();
945        }
946      });
947    } catch (InterruptedException ie) {
948      throw new IOException(ie);
949    }
950  }
951
952  /**
953   * Get status information about the max available Reduces in the cluster.
954   *  
955   * @return the max available Reduces in the cluster
956   * @throws IOException
957   */
958  public int getDefaultReduces() throws IOException {
959    try {
960      return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
961        @Override
962        public Integer run() throws IOException, InterruptedException {
963          return cluster.getClusterStatus().getReduceSlotCapacity();
964        }
965      });
966    } catch (InterruptedException ie) {
967      throw new IOException(ie);
968    }
969  }
970
971  /**
972   * Grab the jobtracker system directory path where job-specific files are to be placed.
973   * 
974   * @return the system directory where job-specific files are to be placed.
975   */
976  public Path getSystemDir() {
977    try {
978      return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
979        @Override
980        public Path run() throws IOException, InterruptedException {
981          return cluster.getSystemDir();
982        }
983      });
984      } catch (IOException ioe) {
985      return null;
986    } catch (InterruptedException ie) {
987      return null;
988    }
989  }
990
991  private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
992    JobQueueInfo ret = new JobQueueInfo(queue);
993    // make sure to convert any children
994    if (queue.getQueueChildren().size() > 0) {
995      List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
996          .getQueueChildren().size());
997      for (QueueInfo child : queue.getQueueChildren()) {
998        childQueues.add(getJobQueueInfo(child));
999      }
1000      ret.setChildren(childQueues);
1001    }
1002    return ret;
1003  }
1004
1005  private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
1006      throws IOException {
1007    JobQueueInfo[] ret = new JobQueueInfo[queues.length];
1008    for (int i = 0; i < queues.length; i++) {
1009      ret[i] = getJobQueueInfo(queues[i]);
1010    }
1011    return ret;
1012  }
1013
1014  /**
1015   * Returns an array of queue information objects about root level queues
1016   * configured
1017   *
1018   * @return the array of root level JobQueueInfo objects
1019   * @throws IOException
1020   */
1021  public JobQueueInfo[] getRootQueues() throws IOException {
1022    try {
1023      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1024        public JobQueueInfo[] run() throws IOException, InterruptedException {
1025          return getJobQueueInfoArray(cluster.getRootQueues());
1026        }
1027      });
1028    } catch (InterruptedException ie) {
1029      throw new IOException(ie);
1030    }
1031  }
1032
1033  /**
1034   * Returns an array of queue information objects about immediate children
1035   * of queue queueName.
1036   * 
1037   * @param queueName
1038   * @return the array of immediate children JobQueueInfo objects
1039   * @throws IOException
1040   */
1041  public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
1042    try {
1043      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1044        public JobQueueInfo[] run() throws IOException, InterruptedException {
1045          return getJobQueueInfoArray(cluster.getChildQueues(queueName));
1046        }
1047      });
1048    } catch (InterruptedException ie) {
1049      throw new IOException(ie);
1050    }
1051  }
1052  
1053  /**
1054   * Return an array of queue information objects about all the Job Queues
1055   * configured.
1056   * 
1057   * @return Array of JobQueueInfo objects
1058   * @throws IOException
1059   */
1060  public JobQueueInfo[] getQueues() throws IOException {
1061    try {
1062      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1063        public JobQueueInfo[] run() throws IOException, InterruptedException {
1064          return getJobQueueInfoArray(cluster.getQueues());
1065        }
1066      });
1067    } catch (InterruptedException ie) {
1068      throw new IOException(ie);
1069    }
1070  }
1071  
1072  /**
1073   * Gets all the jobs which were added to particular Job Queue
1074   * 
1075   * @param queueName name of the Job Queue
1076   * @return Array of jobs present in the job queue
1077   * @throws IOException
1078   */
1079  
1080  public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
1081    try {
1082      QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
1083        @Override
1084        public QueueInfo run() throws IOException, InterruptedException {
1085          return cluster.getQueue(queueName);
1086        }
1087      });
1088      if (queue == null) {
1089        return null;
1090      }
1091      org.apache.hadoop.mapreduce.JobStatus[] stats = 
1092        queue.getJobStatuses();
1093      JobStatus[] ret = new JobStatus[stats.length];
1094      for (int i = 0 ; i < stats.length; i++ ) {
1095        ret[i] = JobStatus.downgrade(stats[i]);
1096      }
1097      return ret;
1098    } catch (InterruptedException ie) {
1099      throw new IOException(ie);
1100    }
1101  }
1102  
1103  /**
1104   * Gets the queue information associated to a particular Job Queue
1105   * 
1106   * @param queueName name of the job queue.
1107   * @return Queue information associated to particular queue.
1108   * @throws IOException
1109   */
1110  public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
1111    try {
1112      QueueInfo queueInfo = clientUgi.doAs(new 
1113          PrivilegedExceptionAction<QueueInfo>() {
1114        public QueueInfo run() throws IOException, InterruptedException {
1115          return cluster.getQueue(queueName);
1116        }
1117      });
1118      if (queueInfo != null) {
1119        return new JobQueueInfo(queueInfo);
1120      }
1121      return null;
1122    } catch (InterruptedException ie) {
1123      throw new IOException(ie);
1124    }
1125  }
1126  
1127  /**
1128   * Gets the Queue ACLs for current user
1129   * @return array of QueueAclsInfo object for current user.
1130   * @throws IOException
1131   */
1132  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
1133    try {
1134      org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 
1135        clientUgi.doAs(new 
1136            PrivilegedExceptionAction
1137            <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
1138              public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 
1139              throws IOException, InterruptedException {
1140                return cluster.getQueueAclsForCurrentUser();
1141              }
1142        });
1143      QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
1144      for (int i = 0 ; i < acls.length; i++ ) {
1145        ret[i] = QueueAclsInfo.downgrade(acls[i]);
1146      }
1147      return ret;
1148    } catch (InterruptedException ie) {
1149      throw new IOException(ie);
1150    }
1151  }
1152
1153  /**
1154   * Get a delegation token for the user from the JobTracker.
1155   * @param renewer the user who can renew the token
1156   * @return the new token
1157   * @throws IOException
1158   */
1159  public Token<DelegationTokenIdentifier> 
1160    getDelegationToken(final Text renewer) throws IOException, InterruptedException {
1161    return clientUgi.doAs(new 
1162        PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
1163      public Token<DelegationTokenIdentifier> run() throws IOException, 
1164      InterruptedException {
1165        return cluster.getDelegationToken(renewer);
1166      }
1167    });
1168  }
1169
1170  /**
1171   * Renew a delegation token
1172   * @param token the token to renew
1173   * @return true if the renewal went well
1174   * @throws InvalidToken
1175   * @throws IOException
1176   * @deprecated Use {@link Token#renew} instead
1177   */
1178  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
1179                                   ) throws InvalidToken, IOException, 
1180                                            InterruptedException {
1181    return token.renew(getConf());
1182  }
1183
1184  /**
1185   * Cancel a delegation token from the JobTracker
1186   * @param token the token to cancel
1187   * @throws IOException
1188   * @deprecated Use {@link Token#cancel} instead
1189   */
1190  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
1191                                    ) throws InvalidToken, IOException, 
1192                                             InterruptedException {
1193    token.cancel(getConf());
1194  }
1195
1196  /**
1197   */
1198  public static void main(String argv[]) throws Exception {
1199    int res = ToolRunner.run(new JobClient(), argv);
1200    System.exit(res);
1201  }
1202}
1203