001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapred;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.net.InetSocketAddress;
023import java.net.URL;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.List;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.io.Text;
036import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
037import org.apache.hadoop.mapreduce.Cluster;
038import org.apache.hadoop.mapreduce.ClusterMetrics;
039import org.apache.hadoop.mapreduce.Job;
040import org.apache.hadoop.mapreduce.MRJobConfig;
041import org.apache.hadoop.mapreduce.QueueInfo;
042import org.apache.hadoop.mapreduce.TaskTrackerInfo;
043import org.apache.hadoop.mapreduce.TaskType;
044import org.apache.hadoop.mapreduce.filecache.DistributedCache;
045import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
046import org.apache.hadoop.mapreduce.tools.CLI;
047import org.apache.hadoop.mapreduce.util.ConfigUtil;
048import org.apache.hadoop.security.UserGroupInformation;
049import org.apache.hadoop.security.token.SecretManager.InvalidToken;
050import org.apache.hadoop.security.token.Token;
051import org.apache.hadoop.security.token.TokenRenewer;
052import org.apache.hadoop.util.Tool;
053import org.apache.hadoop.util.ToolRunner;
054
055/**
056 * <code>JobClient</code> is the primary interface for the user-job to interact
057 * with the cluster.
058 * 
059 * <code>JobClient</code> provides facilities to submit jobs, track their 
060 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
061 * status information etc.
062 * 
063 * <p>The job submission process involves:
064 * <ol>
065 *   <li>
066 *   Checking the input and output specifications of the job.
067 *   </li>
068 *   <li>
069 *   Computing the {@link InputSplit}s for the job.
070 *   </li>
071 *   <li>
072 *   Setup the requisite accounting information for the {@link DistributedCache} 
073 *   of the job, if necessary.
074 *   </li>
075 *   <li>
076 *   Copying the job's jar and configuration to the map-reduce system directory 
077 *   on the distributed file-system. 
078 *   </li>
079 *   <li>
080 *   Submitting the job to the cluster and optionally monitoring
081 *   it's status.
082 *   </li>
083 * </ol>
084 *  
085 * Normally the user creates the application, describes various facets of the
086 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 
087 * the job and monitor its progress.
088 * 
089 * <p>Here is an example on how to use <code>JobClient</code>:</p>
090 * <p><blockquote><pre>
091 *     // Create a new JobConf
092 *     JobConf job = new JobConf(new Configuration(), MyJob.class);
093 *     
094 *     // Specify various job-specific parameters     
095 *     job.setJobName("myjob");
096 *     
097 *     job.setInputPath(new Path("in"));
098 *     job.setOutputPath(new Path("out"));
099 *     
100 *     job.setMapperClass(MyJob.MyMapper.class);
101 *     job.setReducerClass(MyJob.MyReducer.class);
102 *
103 *     // Submit the job, then poll for progress until the job is complete
104 *     JobClient.runJob(job);
105 * </pre></blockquote>
106 * 
107 * <b id="JobControl">Job Control</b>
108 * 
109 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 
110 * which cannot be done via a single map-reduce job. This is fairly easy since 
111 * the output of the job, typically, goes to distributed file-system and that 
112 * can be used as the input for the next job.</p>
113 * 
114 * <p>However, this also means that the onus on ensuring jobs are complete 
115 * (success/failure) lies squarely on the clients. In such situations the 
116 * various job-control options are:
117 * <ol>
118 *   <li>
119 *   {@link #runJob(JobConf)} : submits the job and returns only after 
120 *   the job has completed.
121 *   </li>
122 *   <li>
123 *   {@link #submitJob(JobConf)} : only submits the job, then poll the 
124 *   returned handle to the {@link RunningJob} to query status and make 
125 *   scheduling decisions.
126 *   </li>
127 *   <li>
128 *   {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
129 *   on job-completion, thus avoiding polling.
130 *   </li>
131 * </ol>
132 * 
133 * @see JobConf
134 * @see ClusterStatus
135 * @see Tool
136 * @see DistributedCache
137 */
138@InterfaceAudience.Public
139@InterfaceStability.Stable
140public class JobClient extends CLI {
141
142  @InterfaceAudience.Private
143  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
144      "mapreduce.jobclient.retry.policy.enabled";
145  @InterfaceAudience.Private
146  public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT =
147      false;
148  @InterfaceAudience.Private
149  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
150      "mapreduce.jobclient.retry.policy.spec";
151  @InterfaceAudience.Private
152  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
153      "10000,6,60000,10"; // t1,n1,t2,n2,...
154
155  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
156  private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
157  
158  private int maxRetry = MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES;
159  private long retryInterval =
160      MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL;
161
162  static{
163    ConfigUtil.loadResources();
164  }
165
166  /**
167   * A NetworkedJob is an implementation of RunningJob.  It holds
168   * a JobProfile object to provide some info, and interacts with the
169   * remote service to provide certain functionality.
170   */
171  static class NetworkedJob implements RunningJob {
172    Job job;
173    /**
174     * We store a JobProfile and a timestamp for when we last
175     * acquired the job profile.  If the job is null, then we cannot
176     * perform any of the tasks.  The job might be null if the cluster
177     * has completely forgotten about the job.  (eg, 24 hours after the
178     * job completes.)
179     */
180    public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
181      this(status, cluster, new JobConf(status.getJobFile()));
182    }
183    
184    private NetworkedJob(JobStatus status, Cluster cluster, JobConf conf)
185        throws IOException {
186      this(Job.getInstance(cluster, status, conf));
187    }
188
189    public NetworkedJob(Job job) throws IOException {
190      this.job = job;
191    }
192
193    public Configuration getConfiguration() {
194      return job.getConfiguration();
195    }
196
197    /**
198     * An identifier for the job
199     */
200    public JobID getID() {
201      return JobID.downgrade(job.getJobID());
202    }
203    
204    /** @deprecated This method is deprecated and will be removed. Applications should 
205     * rather use {@link #getID()}.*/
206    @Deprecated
207    public String getJobID() {
208      return getID().toString();
209    }
210    
211    /**
212     * The user-specified job name
213     */
214    public String getJobName() {
215      return job.getJobName();
216    }
217
218    /**
219     * The name of the job file
220     */
221    public String getJobFile() {
222      return job.getJobFile();
223    }
224
225    /**
226     * A URL where the job's status can be seen
227     */
228    public String getTrackingURL() {
229      return job.getTrackingURL();
230    }
231
232    /**
233     * A float between 0.0 and 1.0, indicating the % of map work
234     * completed.
235     */
236    public float mapProgress() throws IOException {
237      return job.mapProgress();
238    }
239
240    /**
241     * A float between 0.0 and 1.0, indicating the % of reduce work
242     * completed.
243     */
244    public float reduceProgress() throws IOException {
245      return job.reduceProgress();
246    }
247
248    /**
249     * A float between 0.0 and 1.0, indicating the % of cleanup work
250     * completed.
251     */
252    public float cleanupProgress() throws IOException {
253      try {
254        return job.cleanupProgress();
255      } catch (InterruptedException ie) {
256        throw new IOException(ie);
257      }
258    }
259
260    /**
261     * A float between 0.0 and 1.0, indicating the % of setup work
262     * completed.
263     */
264    public float setupProgress() throws IOException {
265      return job.setupProgress();
266    }
267
268    /**
269     * Returns immediately whether the whole job is done yet or not.
270     */
271    public synchronized boolean isComplete() throws IOException {
272      return job.isComplete();
273    }
274
275    /**
276     * True iff job completed successfully.
277     */
278    public synchronized boolean isSuccessful() throws IOException {
279      return job.isSuccessful();
280    }
281
282    /**
283     * Blocks until the job is finished
284     */
285    public void waitForCompletion() throws IOException {
286      try {
287        job.waitForCompletion(false);
288      } catch (InterruptedException ie) {
289        throw new IOException(ie);
290      } catch (ClassNotFoundException ce) {
291        throw new IOException(ce);
292      }
293    }
294
295    /**
296     * Tells the service to get the state of the current job.
297     */
298    public synchronized int getJobState() throws IOException {
299      try {
300        return job.getJobState().getValue();
301      } catch (InterruptedException ie) {
302        throw new IOException(ie);
303      }
304    }
305    
306    /**
307     * Tells the service to terminate the current job.
308     */
309    public synchronized void killJob() throws IOException {
310      job.killJob();
311    }
312   
313    
314    /** Set the priority of the job.
315    * @param priority new priority of the job. 
316    */
317    public synchronized void setJobPriority(String priority) 
318                                                throws IOException {
319      try {
320        job.setPriority(
321          org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
322      } catch (InterruptedException ie) {
323        throw new IOException(ie);
324      }
325    }
326    
327    /**
328     * Kill indicated task attempt.
329     * @param taskId the id of the task to kill.
330     * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
331     * it is just killed, w/o affecting job failure status.
332     */
333    public synchronized void killTask(TaskAttemptID taskId,
334        boolean shouldFail) throws IOException {
335      if (shouldFail) {
336        job.failTask(taskId);
337      } else {
338        job.killTask(taskId);
339      }
340    }
341
342    /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
343    @Deprecated
344    public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
345      killTask(TaskAttemptID.forName(taskId), shouldFail);
346    }
347    
348    /**
349     * Fetch task completion events from cluster for this job. 
350     */
351    public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
352        int startFrom) throws IOException {
353      try {
354        org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 
355          job.getTaskCompletionEvents(startFrom, 10);
356        TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
357        for (int i = 0 ; i < acls.length; i++ ) {
358          ret[i] = TaskCompletionEvent.downgrade(acls[i]);
359        }
360        return ret;
361      } catch (InterruptedException ie) {
362        throw new IOException(ie);
363      }
364    }
365
366    /**
367     * Dump stats to screen
368     */
369    @Override
370    public String toString() {
371      return job.toString();
372    }
373        
374    /**
375     * Returns the counters for this job
376     */
377    public Counters getCounters() throws IOException {
378      Counters result = null;
379      org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
380      if(temp != null) {
381        result = Counters.downgrade(temp);
382      }
383      return result;
384    }
385    
386    @Override
387    public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
388      try { 
389        return job.getTaskDiagnostics(id);
390      } catch (InterruptedException ie) {
391        throw new IOException(ie);
392      }
393    }
394
395    public String getHistoryUrl() throws IOException {
396      try {
397        return job.getHistoryUrl();
398      } catch (InterruptedException ie) {
399        throw new IOException(ie);
400      }
401    }
402
403    public boolean isRetired() throws IOException {
404      try {
405        return job.isRetired();
406      } catch (InterruptedException ie) {
407        throw new IOException(ie);
408      }
409    }
410    
411    boolean monitorAndPrintJob() throws IOException, InterruptedException {
412      return job.monitorAndPrintJob();
413    }
414    
415    @Override
416    public String getFailureInfo() throws IOException {
417      try {
418        return job.getStatus().getFailureInfo();
419      } catch (InterruptedException ie) {
420        throw new IOException(ie);
421      }
422    }
423
424    @Override
425    public JobStatus getJobStatus() throws IOException {
426      try {
427        return JobStatus.downgrade(job.getStatus());
428      } catch (InterruptedException ie) {
429        throw new IOException(ie);
430      }
431    }
432  }
433
434  /**
435   * Ugi of the client. We store this ugi when the client is created and 
436   * then make sure that the same ugi is used to run the various protocols.
437   */
438  UserGroupInformation clientUgi;
439  
440  /**
441   * Create a job client.
442   */
443  public JobClient() {
444  }
445    
446  /**
447   * Build a job client with the given {@link JobConf}, and connect to the 
448   * default cluster
449   * 
450   * @param conf the job configuration.
451   * @throws IOException
452   */
453  public JobClient(JobConf conf) throws IOException {
454    init(conf);
455  }
456
457  /**
458   * Build a job client with the given {@link Configuration}, 
459   * and connect to the default cluster
460   * 
461   * @param conf the configuration.
462   * @throws IOException
463   */
464  public JobClient(Configuration conf) throws IOException {
465    init(new JobConf(conf));
466  }
467
468  /**
469   * Connect to the default cluster
470   * @param conf the job configuration.
471   * @throws IOException
472   */
473  public void init(JobConf conf) throws IOException {
474    setConf(conf);
475    cluster = new Cluster(conf);
476    clientUgi = UserGroupInformation.getCurrentUser();
477
478    maxRetry = conf.getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
479      MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
480
481    retryInterval =
482      conf.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
483        MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
484
485  }
486
487  /**
488   * Build a job client, connect to the indicated job tracker.
489   * 
490   * @param jobTrackAddr the job tracker to connect to.
491   * @param conf configuration.
492   */
493  public JobClient(InetSocketAddress jobTrackAddr, 
494                   Configuration conf) throws IOException {
495    cluster = new Cluster(jobTrackAddr, conf);
496    clientUgi = UserGroupInformation.getCurrentUser();
497  }
498
499  /**
500   * Close the <code>JobClient</code>.
501   */
502  public synchronized void close() throws IOException {
503    cluster.close();
504  }
505
506  /**
507   * Get a filesystem handle.  We need this to prepare jobs
508   * for submission to the MapReduce system.
509   * 
510   * @return the filesystem handle.
511   */
512  public synchronized FileSystem getFs() throws IOException {
513    try { 
514      return cluster.getFileSystem();
515    } catch (InterruptedException ie) {
516      throw new IOException(ie);
517    }
518  }
519  
520  /**
521   * Get a handle to the Cluster
522   */
523  public Cluster getClusterHandle() {
524    return cluster;
525  }
526  
527  /**
528   * Submit a job to the MR system.
529   * 
530   * This returns a handle to the {@link RunningJob} which can be used to track
531   * the running-job.
532   * 
533   * @param jobFile the job configuration.
534   * @return a handle to the {@link RunningJob} which can be used to track the
535   *         running-job.
536   * @throws FileNotFoundException
537   * @throws InvalidJobConfException
538   * @throws IOException
539   */
540  public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
541                                                     InvalidJobConfException, 
542                                                     IOException {
543    // Load in the submitted job details
544    JobConf job = new JobConf(jobFile);
545    return submitJob(job);
546  }
547    
548  /**
549   * Submit a job to the MR system.
550   * This returns a handle to the {@link RunningJob} which can be used to track
551   * the running-job.
552   * 
553   * @param conf the job configuration.
554   * @return a handle to the {@link RunningJob} which can be used to track the
555   *         running-job.
556   * @throws FileNotFoundException
557   * @throws IOException
558   */
559  public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
560                                                  IOException {
561    return submitJobInternal(conf);
562  }
563
564  @InterfaceAudience.Private
565  public RunningJob submitJobInternal(final JobConf conf)
566      throws FileNotFoundException, IOException {
567    try {
568      conf.setBooleanIfUnset("mapred.mapper.new-api", false);
569      conf.setBooleanIfUnset("mapred.reducer.new-api", false);
570      Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
571        @Override
572        public Job run() throws IOException, ClassNotFoundException, 
573          InterruptedException {
574          Job job = Job.getInstance(conf);
575          job.submit();
576          return job;
577        }
578      });
579
580      Cluster prev = cluster;
581      // update our Cluster instance with the one created by Job for submission
582      // (we can't pass our Cluster instance to Job, since Job wraps the config
583      // instance, and the two configs would then diverge)
584      cluster = job.getCluster();
585
586      // It is important to close the previous cluster instance
587      // to cleanup resources.
588      if (prev != null) {
589        prev.close();
590      }
591      return new NetworkedJob(job);
592    } catch (InterruptedException ie) {
593      throw new IOException("interrupted", ie);
594    }
595  }
596
597  private Job getJobUsingCluster(final JobID jobid) throws IOException,
598  InterruptedException {
599    return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
600      public Job run() throws IOException, InterruptedException  {
601       return cluster.getJob(jobid);
602      }
603    });
604  }
605
606  protected RunningJob getJobInner(final JobID jobid) throws IOException {
607    try {
608      
609      Job job = getJobUsingCluster(jobid);
610      if (job != null) {
611        JobStatus status = JobStatus.downgrade(job.getStatus());
612        if (status != null) {
613          return new NetworkedJob(status, cluster,
614              new JobConf(job.getConfiguration()));
615        } 
616      }
617    } catch (InterruptedException ie) {
618      throw new IOException(ie);
619    }
620    return null;
621  }
622
623  /**
624   * Get an {@link RunningJob} object to track an ongoing job.  Returns
625   * null if the id does not correspond to any known job.
626   *
627   * @param jobid the jobid of the job.
628   * @return the {@link RunningJob} handle to track the job, null if the
629   *         <code>jobid</code> doesn't correspond to any known job.
630   * @throws IOException
631   */
632  public RunningJob getJob(final JobID jobid) throws IOException {
633     for (int i = 0;i <= maxRetry;i++) {
634       if (i > 0) {
635         try {
636           Thread.sleep(retryInterval);
637         } catch (Exception e) { }
638       }
639       RunningJob job = getJobInner(jobid);
640       if (job != null) {
641         return job;
642       }
643     }
644     return null;
645  }
646
647  /**@deprecated Applications should rather use {@link #getJob(JobID)}.
648   */
649  @Deprecated
650  public RunningJob getJob(String jobid) throws IOException {
651    return getJob(JobID.forName(jobid));
652  }
653  
654  private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
655  
656  /**
657   * Get the information of the current state of the map tasks of a job.
658   * 
659   * @param jobId the job to query.
660   * @return the list of all of the map tips.
661   * @throws IOException
662   */
663  public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
664    return getTaskReports(jobId, TaskType.MAP);
665  }
666  
667  private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 
668    IOException {
669    try {
670      Job j = getJobUsingCluster(jobId);
671      if(j == null) {
672        return EMPTY_TASK_REPORTS;
673      }
674      return TaskReport.downgradeArray(j.getTaskReports(type));
675    } catch (InterruptedException ie) {
676      throw new IOException(ie);
677    }
678  }
679  
680  /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
681  @Deprecated
682  public TaskReport[] getMapTaskReports(String jobId) throws IOException {
683    return getMapTaskReports(JobID.forName(jobId));
684  }
685  
686  /**
687   * Get the information of the current state of the reduce tasks of a job.
688   * 
689   * @param jobId the job to query.
690   * @return the list of all of the reduce tips.
691   * @throws IOException
692   */    
693  public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
694    return getTaskReports(jobId, TaskType.REDUCE);
695  }
696
697  /**
698   * Get the information of the current state of the cleanup tasks of a job.
699   * 
700   * @param jobId the job to query.
701   * @return the list of all of the cleanup tips.
702   * @throws IOException
703   */    
704  public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
705    return getTaskReports(jobId, TaskType.JOB_CLEANUP);
706  }
707
708  /**
709   * Get the information of the current state of the setup tasks of a job.
710   * 
711   * @param jobId the job to query.
712   * @return the list of all of the setup tips.
713   * @throws IOException
714   */    
715  public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
716    return getTaskReports(jobId, TaskType.JOB_SETUP);
717  }
718
719  
720  /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
721  @Deprecated
722  public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
723    return getReduceTaskReports(JobID.forName(jobId));
724  }
725  
726  /**
727   * Display the information about a job's tasks, of a particular type and
728   * in a particular state
729   * 
730   * @param jobId the ID of the job
731   * @param type the type of the task (map/reduce/setup/cleanup)
732   * @param state the state of the task 
733   * (pending/running/completed/failed/killed)
734   */
735  public void displayTasks(final JobID jobId, String type, String state) 
736  throws IOException {
737    try {
738      Job job = getJobUsingCluster(jobId);
739      super.displayTasks(job, type, state);
740    } catch (InterruptedException ie) {
741      throw new IOException(ie);
742    }
743  }
744  
745  /**
746   * Get status information about the Map-Reduce cluster.
747   *  
748   * @return the status information about the Map-Reduce cluster as an object
749   *         of {@link ClusterStatus}.
750   * @throws IOException
751   */
752  public ClusterStatus getClusterStatus() throws IOException {
753    try {
754      return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
755        public ClusterStatus run() throws IOException, InterruptedException {
756          ClusterMetrics metrics = cluster.getClusterStatus();
757          return new ClusterStatus(metrics.getTaskTrackerCount(), metrics
758            .getBlackListedTaskTrackerCount(), cluster
759            .getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
760            metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
761            metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(),
762            metrics.getDecommissionedTaskTrackerCount(), metrics
763              .getGrayListedTaskTrackerCount());
764        }
765      });
766    } catch (InterruptedException ie) {
767      throw new IOException(ie);
768    }
769  }
770
771  private  Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
772    Collection<String> list = new ArrayList<String>();
773    for (TaskTrackerInfo info: objs) {
774      list.add(info.getTaskTrackerName());
775    }
776    return list;
777  }
778
779  private  Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
780    Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
781    for (TaskTrackerInfo info: objs) {
782      BlackListInfo binfo = new BlackListInfo();
783      binfo.setTrackerName(info.getTaskTrackerName());
784      binfo.setReasonForBlackListing(info.getReasonForBlacklist());
785      binfo.setBlackListReport(info.getBlacklistReport());
786      list.add(binfo);
787    }
788    return list;
789  }
790
791  /**
792   * Get status information about the Map-Reduce cluster.
793   *  
794   * @param  detailed if true then get a detailed status including the
795   *         tracker names
796   * @return the status information about the Map-Reduce cluster as an object
797   *         of {@link ClusterStatus}.
798   * @throws IOException
799   */
800  public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
801    try {
802      return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
803        public ClusterStatus run() throws IOException, InterruptedException {
804        ClusterMetrics metrics = cluster.getClusterStatus();
805        return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
806          arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
807          cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
808          metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
809          metrics.getReduceSlotCapacity(), 
810          cluster.getJobTrackerStatus());
811        }
812      });
813    } catch (InterruptedException ie) {
814      throw new IOException(ie);
815    }
816  }
817    
818
819  /** 
820   * Get the jobs that are not completed and not failed.
821   * 
822   * @return array of {@link JobStatus} for the running/to-be-run jobs.
823   * @throws IOException
824   */
825  public JobStatus[] jobsToComplete() throws IOException {
826    List<JobStatus> stats = new ArrayList<JobStatus>();
827    for (JobStatus stat : getAllJobs()) {
828      if (!stat.isJobComplete()) {
829        stats.add(stat);
830      }
831    }
832    return stats.toArray(new JobStatus[0]);
833  }
834
835  /** 
836   * Get the jobs that are submitted.
837   * 
838   * @return array of {@link JobStatus} for the submitted jobs.
839   * @throws IOException
840   */
841  public JobStatus[] getAllJobs() throws IOException {
842    try {
843      org.apache.hadoop.mapreduce.JobStatus[] jobs = 
844          clientUgi.doAs(new PrivilegedExceptionAction<
845              org.apache.hadoop.mapreduce.JobStatus[]> () {
846            public org.apache.hadoop.mapreduce.JobStatus[] run() 
847                throws IOException, InterruptedException {
848              return cluster.getAllJobStatuses();
849            }
850          });
851      JobStatus[] stats = new JobStatus[jobs.length];
852      for (int i = 0; i < jobs.length; i++) {
853        stats[i] = JobStatus.downgrade(jobs[i]);
854      }
855      return stats;
856    } catch (InterruptedException ie) {
857      throw new IOException(ie);
858    }
859  }
860  
861  /** 
862   * Utility that submits a job, then polls for progress until the job is
863   * complete.
864   * 
865   * @param job the job configuration.
866   * @throws IOException if the job fails
867   */
868  public static RunningJob runJob(JobConf job) throws IOException {
869    JobClient jc = new JobClient(job);
870    RunningJob rj = jc.submitJob(job);
871    try {
872      if (!jc.monitorAndPrintJob(job, rj)) {
873        throw new IOException("Job failed!");
874      }
875    } catch (InterruptedException ie) {
876      Thread.currentThread().interrupt();
877    }
878    return rj;
879  }
880  
881  /**
882   * Monitor a job and print status in real-time as progress is made and tasks 
883   * fail.
884   * @param conf the job's configuration
885   * @param job the job to track
886   * @return true if the job succeeded
887   * @throws IOException if communication to the JobTracker fails
888   */
889  public boolean monitorAndPrintJob(JobConf conf, 
890                                    RunningJob job
891  ) throws IOException, InterruptedException {
892    return ((NetworkedJob)job).monitorAndPrintJob();
893  }
894
895  static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
896    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
897  }
898  
899  static Configuration getConfiguration(String jobTrackerSpec)
900  {
901    Configuration conf = new Configuration();
902    if (jobTrackerSpec != null) {        
903      if (jobTrackerSpec.indexOf(":") >= 0) {
904        conf.set("mapred.job.tracker", jobTrackerSpec);
905      } else {
906        String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
907        URL validate = conf.getResource(classpathFile);
908        if (validate == null) {
909          throw new RuntimeException(classpathFile + " not found on CLASSPATH");
910        }
911        conf.addResource(classpathFile);
912      }
913    }
914    return conf;
915  }
916
917  /**
918   * Sets the output filter for tasks. only those tasks are printed whose
919   * output matches the filter. 
920   * @param newValue task filter.
921   */
922  @Deprecated
923  public void setTaskOutputFilter(TaskStatusFilter newValue){
924    this.taskOutputFilter = newValue;
925  }
926    
927  /**
928   * Get the task output filter out of the JobConf.
929   * 
930   * @param job the JobConf to examine.
931   * @return the filter level.
932   */
933  public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
934    return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
935                                            "FAILED"));
936  }
937    
938  /**
939   * Modify the JobConf to set the task output filter.
940   * 
941   * @param job the JobConf to modify.
942   * @param newValue the value to set.
943   */
944  public static void setTaskOutputFilter(JobConf job, 
945                                         TaskStatusFilter newValue) {
946    job.set("jobclient.output.filter", newValue.toString());
947  }
948    
949  /**
950   * Returns task output filter.
951   * @return task filter. 
952   */
953  @Deprecated
954  public TaskStatusFilter getTaskOutputFilter(){
955    return this.taskOutputFilter; 
956  }
957
958  protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
959      String counterGroupName, String counterName) throws IOException {
960    Counters counters = Counters.downgrade(cntrs);
961    return counters.findCounter(counterGroupName, counterName).getValue();
962  }
963
964  /**
965   * Get status information about the max available Maps in the cluster.
966   *  
967   * @return the max available Maps in the cluster
968   * @throws IOException
969   */
970  public int getDefaultMaps() throws IOException {
971    try {
972      return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
973        @Override
974        public Integer run() throws IOException, InterruptedException {
975          return cluster.getClusterStatus().getMapSlotCapacity();
976        }
977      });
978    } catch (InterruptedException ie) {
979      throw new IOException(ie);
980    }
981  }
982
983  /**
984   * Get status information about the max available Reduces in the cluster.
985   *  
986   * @return the max available Reduces in the cluster
987   * @throws IOException
988   */
989  public int getDefaultReduces() throws IOException {
990    try {
991      return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
992        @Override
993        public Integer run() throws IOException, InterruptedException {
994          return cluster.getClusterStatus().getReduceSlotCapacity();
995        }
996      });
997    } catch (InterruptedException ie) {
998      throw new IOException(ie);
999    }
1000  }
1001
1002  /**
1003   * Grab the jobtracker system directory path where job-specific files are to be placed.
1004   * 
1005   * @return the system directory where job-specific files are to be placed.
1006   */
1007  public Path getSystemDir() {
1008    try {
1009      return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
1010        @Override
1011        public Path run() throws IOException, InterruptedException {
1012          return cluster.getSystemDir();
1013        }
1014      });
1015      } catch (IOException ioe) {
1016      return null;
1017    } catch (InterruptedException ie) {
1018      return null;
1019    }
1020  }
1021
1022  /**
1023   * Checks if the job directory is clean and has all the required components
1024   * for (re) starting the job
1025   */
1026  public static boolean isJobDirValid(Path jobDirPath, FileSystem fs)
1027      throws IOException {
1028    FileStatus[] contents = fs.listStatus(jobDirPath);
1029    int matchCount = 0;
1030    if (contents != null && contents.length >= 2) {
1031      for (FileStatus status : contents) {
1032        if ("job.xml".equals(status.getPath().getName())) {
1033          ++matchCount;
1034        }
1035        if ("job.split".equals(status.getPath().getName())) {
1036          ++matchCount;
1037        }
1038      }
1039      if (matchCount == 2) {
1040        return true;
1041      }
1042    }
1043    return false;
1044  }
1045
1046  /**
1047   * Fetch the staging area directory for the application
1048   * 
1049   * @return path to staging area directory
1050   * @throws IOException
1051   */
1052  public Path getStagingAreaDir() throws IOException {
1053    try {
1054      return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
1055        @Override
1056        public Path run() throws IOException, InterruptedException {
1057          return cluster.getStagingAreaDir();
1058        }
1059      });
1060    } catch (InterruptedException ie) {
1061      // throw RuntimeException instead for compatibility reasons
1062      throw new RuntimeException(ie);
1063    }
1064  }
1065
1066  private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
1067    JobQueueInfo ret = new JobQueueInfo(queue);
1068    // make sure to convert any children
1069    if (queue.getQueueChildren().size() > 0) {
1070      List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
1071          .getQueueChildren().size());
1072      for (QueueInfo child : queue.getQueueChildren()) {
1073        childQueues.add(getJobQueueInfo(child));
1074      }
1075      ret.setChildren(childQueues);
1076    }
1077    return ret;
1078  }
1079
1080  private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
1081      throws IOException {
1082    JobQueueInfo[] ret = new JobQueueInfo[queues.length];
1083    for (int i = 0; i < queues.length; i++) {
1084      ret[i] = getJobQueueInfo(queues[i]);
1085    }
1086    return ret;
1087  }
1088
1089  /**
1090   * Returns an array of queue information objects about root level queues
1091   * configured
1092   *
1093   * @return the array of root level JobQueueInfo objects
1094   * @throws IOException
1095   */
1096  public JobQueueInfo[] getRootQueues() throws IOException {
1097    try {
1098      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1099        public JobQueueInfo[] run() throws IOException, InterruptedException {
1100          return getJobQueueInfoArray(cluster.getRootQueues());
1101        }
1102      });
1103    } catch (InterruptedException ie) {
1104      throw new IOException(ie);
1105    }
1106  }
1107
1108  /**
1109   * Returns an array of queue information objects about immediate children
1110   * of queue queueName.
1111   * 
1112   * @param queueName
1113   * @return the array of immediate children JobQueueInfo objects
1114   * @throws IOException
1115   */
1116  public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
1117    try {
1118      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1119        public JobQueueInfo[] run() throws IOException, InterruptedException {
1120          return getJobQueueInfoArray(cluster.getChildQueues(queueName));
1121        }
1122      });
1123    } catch (InterruptedException ie) {
1124      throw new IOException(ie);
1125    }
1126  }
1127  
1128  /**
1129   * Return an array of queue information objects about all the Job Queues
1130   * configured.
1131   * 
1132   * @return Array of JobQueueInfo objects
1133   * @throws IOException
1134   */
1135  public JobQueueInfo[] getQueues() throws IOException {
1136    try {
1137      return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1138        public JobQueueInfo[] run() throws IOException, InterruptedException {
1139          return getJobQueueInfoArray(cluster.getQueues());
1140        }
1141      });
1142    } catch (InterruptedException ie) {
1143      throw new IOException(ie);
1144    }
1145  }
1146  
1147  /**
1148   * Gets all the jobs which were added to particular Job Queue
1149   * 
1150   * @param queueName name of the Job Queue
1151   * @return Array of jobs present in the job queue
1152   * @throws IOException
1153   */
1154  
1155  public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
1156    try {
1157      QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
1158        @Override
1159        public QueueInfo run() throws IOException, InterruptedException {
1160          return cluster.getQueue(queueName);
1161        }
1162      });
1163      if (queue == null) {
1164        return null;
1165      }
1166      org.apache.hadoop.mapreduce.JobStatus[] stats = 
1167        queue.getJobStatuses();
1168      JobStatus[] ret = new JobStatus[stats.length];
1169      for (int i = 0 ; i < stats.length; i++ ) {
1170        ret[i] = JobStatus.downgrade(stats[i]);
1171      }
1172      return ret;
1173    } catch (InterruptedException ie) {
1174      throw new IOException(ie);
1175    }
1176  }
1177  
1178  /**
1179   * Gets the queue information associated to a particular Job Queue
1180   * 
1181   * @param queueName name of the job queue.
1182   * @return Queue information associated to particular queue.
1183   * @throws IOException
1184   */
1185  public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
1186    try {
1187      QueueInfo queueInfo = clientUgi.doAs(new 
1188          PrivilegedExceptionAction<QueueInfo>() {
1189        public QueueInfo run() throws IOException, InterruptedException {
1190          return cluster.getQueue(queueName);
1191        }
1192      });
1193      if (queueInfo != null) {
1194        return new JobQueueInfo(queueInfo);
1195      }
1196      return null;
1197    } catch (InterruptedException ie) {
1198      throw new IOException(ie);
1199    }
1200  }
1201  
1202  /**
1203   * Gets the Queue ACLs for current user
1204   * @return array of QueueAclsInfo object for current user.
1205   * @throws IOException
1206   */
1207  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
1208    try {
1209      org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 
1210        clientUgi.doAs(new 
1211            PrivilegedExceptionAction
1212            <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
1213              public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 
1214              throws IOException, InterruptedException {
1215                return cluster.getQueueAclsForCurrentUser();
1216              }
1217        });
1218      QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
1219      for (int i = 0 ; i < acls.length; i++ ) {
1220        ret[i] = QueueAclsInfo.downgrade(acls[i]);
1221      }
1222      return ret;
1223    } catch (InterruptedException ie) {
1224      throw new IOException(ie);
1225    }
1226  }
1227
1228  /**
1229   * Get a delegation token for the user from the JobTracker.
1230   * @param renewer the user who can renew the token
1231   * @return the new token
1232   * @throws IOException
1233   */
1234  public Token<DelegationTokenIdentifier> 
1235    getDelegationToken(final Text renewer) throws IOException, InterruptedException {
1236    return clientUgi.doAs(new 
1237        PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
1238      public Token<DelegationTokenIdentifier> run() throws IOException, 
1239      InterruptedException {
1240        return cluster.getDelegationToken(renewer);
1241      }
1242    });
1243  }
1244
1245  /**
1246   * Renew a delegation token
1247   * @param token the token to renew
1248   * @return true if the renewal went well
1249   * @throws InvalidToken
1250   * @throws IOException
1251   * @deprecated Use {@link Token#renew} instead
1252   */
1253  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
1254                                   ) throws InvalidToken, IOException, 
1255                                            InterruptedException {
1256    return token.renew(getConf());
1257  }
1258
1259  /**
1260   * Cancel a delegation token from the JobTracker
1261   * @param token the token to cancel
1262   * @throws IOException
1263   * @deprecated Use {@link Token#cancel} instead
1264   */
1265  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
1266                                    ) throws InvalidToken, IOException, 
1267                                             InterruptedException {
1268    token.cancel(getConf());
1269  }
1270
1271  /**
1272   */
1273  public static void main(String argv[]) throws Exception {
1274    int res = ToolRunner.run(new JobClient(), argv);
1275    System.exit(res);
1276  }
1277}
1278