001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapred;
019    
020    import java.io.FileNotFoundException;
021    import java.io.IOException;
022    import java.net.InetSocketAddress;
023    import java.net.URL;
024    import java.security.PrivilegedExceptionAction;
025    import java.util.ArrayList;
026    import java.util.Collection;
027    import java.util.List;
028    
029    import org.apache.hadoop.classification.InterfaceAudience;
030    import org.apache.hadoop.classification.InterfaceStability;
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.hadoop.fs.FileStatus;
033    import org.apache.hadoop.fs.FileSystem;
034    import org.apache.hadoop.fs.Path;
035    import org.apache.hadoop.io.Text;
036    import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
037    import org.apache.hadoop.mapreduce.Cluster;
038    import org.apache.hadoop.mapreduce.ClusterMetrics;
039    import org.apache.hadoop.mapreduce.Job;
040    import org.apache.hadoop.mapreduce.QueueInfo;
041    import org.apache.hadoop.mapreduce.TaskTrackerInfo;
042    import org.apache.hadoop.mapreduce.TaskType;
043    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
044    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
045    import org.apache.hadoop.mapreduce.tools.CLI;
046    import org.apache.hadoop.mapreduce.util.ConfigUtil;
047    import org.apache.hadoop.security.UserGroupInformation;
048    import org.apache.hadoop.security.token.SecretManager.InvalidToken;
049    import org.apache.hadoop.security.token.Token;
050    import org.apache.hadoop.security.token.TokenRenewer;
051    import org.apache.hadoop.util.Tool;
052    import org.apache.hadoop.util.ToolRunner;
053    
054    /**
055     * <code>JobClient</code> is the primary interface for the user-job to interact
056     * with the cluster.
057     * 
058     * <code>JobClient</code> provides facilities to submit jobs, track their 
059     * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
060     * status information etc.
061     * 
062     * <p>The job submission process involves:
063     * <ol>
064     *   <li>
065     *   Checking the input and output specifications of the job.
066     *   </li>
067     *   <li>
068     *   Computing the {@link InputSplit}s for the job.
069     *   </li>
070     *   <li>
071     *   Setup the requisite accounting information for the {@link DistributedCache} 
072     *   of the job, if necessary.
073     *   </li>
074     *   <li>
075     *   Copying the job's jar and configuration to the map-reduce system directory 
076     *   on the distributed file-system. 
077     *   </li>
078     *   <li>
079     *   Submitting the job to the cluster and optionally monitoring
080     *   it's status.
081     *   </li>
082     * </ol></p>
083     *  
084     * Normally the user creates the application, describes various facets of the
085     * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 
086     * the job and monitor its progress.
087     * 
088     * <p>Here is an example on how to use <code>JobClient</code>:</p>
089     * <p><blockquote><pre>
090     *     // Create a new JobConf
091     *     JobConf job = new JobConf(new Configuration(), MyJob.class);
092     *     
093     *     // Specify various job-specific parameters     
094     *     job.setJobName("myjob");
095     *     
096     *     job.setInputPath(new Path("in"));
097     *     job.setOutputPath(new Path("out"));
098     *     
099     *     job.setMapperClass(MyJob.MyMapper.class);
100     *     job.setReducerClass(MyJob.MyReducer.class);
101     *
102     *     // Submit the job, then poll for progress until the job is complete
103     *     JobClient.runJob(job);
104     * </pre></blockquote></p>
105     * 
106     * <h4 id="JobControl">Job Control</h4>
107     * 
108     * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 
109     * which cannot be done via a single map-reduce job. This is fairly easy since 
110     * the output of the job, typically, goes to distributed file-system and that 
111     * can be used as the input for the next job.</p>
112     * 
113     * <p>However, this also means that the onus on ensuring jobs are complete 
114     * (success/failure) lies squarely on the clients. In such situations the 
115     * various job-control options are:
116     * <ol>
117     *   <li>
118     *   {@link #runJob(JobConf)} : submits the job and returns only after 
119     *   the job has completed.
120     *   </li>
121     *   <li>
122     *   {@link #submitJob(JobConf)} : only submits the job, then poll the 
123     *   returned handle to the {@link RunningJob} to query status and make 
124     *   scheduling decisions.
125     *   </li>
126     *   <li>
127     *   {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
128     *   on job-completion, thus avoiding polling.
129     *   </li>
130     * </ol></p>
131     * 
132     * @see JobConf
133     * @see ClusterStatus
134     * @see Tool
135     * @see DistributedCache
136     */
137    @InterfaceAudience.Public
138    @InterfaceStability.Stable
139    public class JobClient extends CLI {
140    
141      @InterfaceAudience.Private
142      public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
143          "mapreduce.jobclient.retry.policy.enabled";
144      @InterfaceAudience.Private
145      public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT =
146          false;
147      @InterfaceAudience.Private
148      public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
149          "mapreduce.jobclient.retry.policy.spec";
150      @InterfaceAudience.Private
151      public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
152          "10000,6,60000,10"; // t1,n1,t2,n2,...
153    
154      public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
155      private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
156      
157      static{
158        ConfigUtil.loadResources();
159      }
160    
161      /**
162       * A NetworkedJob is an implementation of RunningJob.  It holds
163       * a JobProfile object to provide some info, and interacts with the
164       * remote service to provide certain functionality.
165       */
166      static class NetworkedJob implements RunningJob {
167        Job job;
168        /**
169         * We store a JobProfile and a timestamp for when we last
170         * acquired the job profile.  If the job is null, then we cannot
171         * perform any of the tasks.  The job might be null if the cluster
172         * has completely forgotten about the job.  (eg, 24 hours after the
173         * job completes.)
174         */
175        public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
176          this(status, cluster, new JobConf(status.getJobFile()));
177        }
178        
179        private NetworkedJob(JobStatus status, Cluster cluster, JobConf conf)
180            throws IOException {
181          this(Job.getInstance(cluster, status, conf));
182        }
183    
184        public NetworkedJob(Job job) throws IOException {
185          this.job = job;
186        }
187    
188        public Configuration getConfiguration() {
189          return job.getConfiguration();
190        }
191    
192        /**
193         * An identifier for the job
194         */
195        public JobID getID() {
196          return JobID.downgrade(job.getJobID());
197        }
198        
199        /** @deprecated This method is deprecated and will be removed. Applications should 
200         * rather use {@link #getID()}.*/
201        @Deprecated
202        public String getJobID() {
203          return getID().toString();
204        }
205        
206        /**
207         * The user-specified job name
208         */
209        public String getJobName() {
210          return job.getJobName();
211        }
212    
213        /**
214         * The name of the job file
215         */
216        public String getJobFile() {
217          return job.getJobFile();
218        }
219    
220        /**
221         * A URL where the job's status can be seen
222         */
223        public String getTrackingURL() {
224          return job.getTrackingURL();
225        }
226    
227        /**
228         * A float between 0.0 and 1.0, indicating the % of map work
229         * completed.
230         */
231        public float mapProgress() throws IOException {
232          return job.mapProgress();
233        }
234    
235        /**
236         * A float between 0.0 and 1.0, indicating the % of reduce work
237         * completed.
238         */
239        public float reduceProgress() throws IOException {
240          return job.reduceProgress();
241        }
242    
243        /**
244         * A float between 0.0 and 1.0, indicating the % of cleanup work
245         * completed.
246         */
247        public float cleanupProgress() throws IOException {
248          try {
249            return job.cleanupProgress();
250          } catch (InterruptedException ie) {
251            throw new IOException(ie);
252          }
253        }
254    
255        /**
256         * A float between 0.0 and 1.0, indicating the % of setup work
257         * completed.
258         */
259        public float setupProgress() throws IOException {
260          return job.setupProgress();
261        }
262    
263        /**
264         * Returns immediately whether the whole job is done yet or not.
265         */
266        public synchronized boolean isComplete() throws IOException {
267          return job.isComplete();
268        }
269    
270        /**
271         * True iff job completed successfully.
272         */
273        public synchronized boolean isSuccessful() throws IOException {
274          return job.isSuccessful();
275        }
276    
277        /**
278         * Blocks until the job is finished
279         */
280        public void waitForCompletion() throws IOException {
281          try {
282            job.waitForCompletion(false);
283          } catch (InterruptedException ie) {
284            throw new IOException(ie);
285          } catch (ClassNotFoundException ce) {
286            throw new IOException(ce);
287          }
288        }
289    
290        /**
291         * Tells the service to get the state of the current job.
292         */
293        public synchronized int getJobState() throws IOException {
294          try {
295            return job.getJobState().getValue();
296          } catch (InterruptedException ie) {
297            throw new IOException(ie);
298          }
299        }
300        
301        /**
302         * Tells the service to terminate the current job.
303         */
304        public synchronized void killJob() throws IOException {
305          job.killJob();
306        }
307       
308        
309        /** Set the priority of the job.
310        * @param priority new priority of the job. 
311        */
312        public synchronized void setJobPriority(String priority) 
313                                                    throws IOException {
314          try {
315            job.setPriority(
316              org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
317          } catch (InterruptedException ie) {
318            throw new IOException(ie);
319          }
320        }
321        
322        /**
323         * Kill indicated task attempt.
324         * @param taskId the id of the task to kill.
325         * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
326         * it is just killed, w/o affecting job failure status.
327         */
328        public synchronized void killTask(TaskAttemptID taskId,
329            boolean shouldFail) throws IOException {
330          if (shouldFail) {
331            job.failTask(taskId);
332          } else {
333            job.killTask(taskId);
334          }
335        }
336    
337        /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
338        @Deprecated
339        public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
340          killTask(TaskAttemptID.forName(taskId), shouldFail);
341        }
342        
343        /**
344         * Fetch task completion events from cluster for this job. 
345         */
346        public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
347            int startFrom) throws IOException {
348          try {
349            org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 
350              job.getTaskCompletionEvents(startFrom, 10);
351            TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
352            for (int i = 0 ; i < acls.length; i++ ) {
353              ret[i] = TaskCompletionEvent.downgrade(acls[i]);
354            }
355            return ret;
356          } catch (InterruptedException ie) {
357            throw new IOException(ie);
358          }
359        }
360    
361        /**
362         * Dump stats to screen
363         */
364        @Override
365        public String toString() {
366          return job.toString();
367        }
368            
369        /**
370         * Returns the counters for this job
371         */
372        public Counters getCounters() throws IOException {
373          Counters result = null;
374          org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
375          if(temp != null) {
376            result = Counters.downgrade(temp);
377          }
378          return result;
379        }
380        
381        @Override
382        public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
383          try { 
384            return job.getTaskDiagnostics(id);
385          } catch (InterruptedException ie) {
386            throw new IOException(ie);
387          }
388        }
389    
390        public String getHistoryUrl() throws IOException {
391          try {
392            return job.getHistoryUrl();
393          } catch (InterruptedException ie) {
394            throw new IOException(ie);
395          }
396        }
397    
398        public boolean isRetired() throws IOException {
399          try {
400            return job.isRetired();
401          } catch (InterruptedException ie) {
402            throw new IOException(ie);
403          }
404        }
405        
406        boolean monitorAndPrintJob() throws IOException, InterruptedException {
407          return job.monitorAndPrintJob();
408        }
409        
410        @Override
411        public String getFailureInfo() throws IOException {
412          try {
413            return job.getStatus().getFailureInfo();
414          } catch (InterruptedException ie) {
415            throw new IOException(ie);
416          }
417        }
418    
419        @Override
420        public JobStatus getJobStatus() throws IOException {
421          try {
422            return JobStatus.downgrade(job.getStatus());
423          } catch (InterruptedException ie) {
424            throw new IOException(ie);
425          }
426        }
427      }
428    
429      /**
430       * Ugi of the client. We store this ugi when the client is created and 
431       * then make sure that the same ugi is used to run the various protocols.
432       */
433      UserGroupInformation clientUgi;
434      
435      /**
436       * Create a job client.
437       */
438      public JobClient() {
439      }
440        
441      /**
442       * Build a job client with the given {@link JobConf}, and connect to the 
443       * default cluster
444       * 
445       * @param conf the job configuration.
446       * @throws IOException
447       */
448      public JobClient(JobConf conf) throws IOException {
449        init(conf);
450      }
451    
452      /**
453       * Build a job client with the given {@link Configuration}, 
454       * and connect to the default cluster
455       * 
456       * @param conf the configuration.
457       * @throws IOException
458       */
459      public JobClient(Configuration conf) throws IOException {
460        init(new JobConf(conf));
461      }
462    
463      /**
464       * Connect to the default cluster
465       * @param conf the job configuration.
466       * @throws IOException
467       */
468      public void init(JobConf conf) throws IOException {
469        setConf(conf);
470        cluster = new Cluster(conf);
471        clientUgi = UserGroupInformation.getCurrentUser();
472      }
473    
474      /**
475       * Build a job client, connect to the indicated job tracker.
476       * 
477       * @param jobTrackAddr the job tracker to connect to.
478       * @param conf configuration.
479       */
480      public JobClient(InetSocketAddress jobTrackAddr, 
481                       Configuration conf) throws IOException {
482        cluster = new Cluster(jobTrackAddr, conf);
483        clientUgi = UserGroupInformation.getCurrentUser();
484      }
485    
486      /**
487       * Close the <code>JobClient</code>.
488       */
489      public synchronized void close() throws IOException {
490        cluster.close();
491      }
492    
493      /**
494       * Get a filesystem handle.  We need this to prepare jobs
495       * for submission to the MapReduce system.
496       * 
497       * @return the filesystem handle.
498       */
499      public synchronized FileSystem getFs() throws IOException {
500        try { 
501          return cluster.getFileSystem();
502        } catch (InterruptedException ie) {
503          throw new IOException(ie);
504        }
505      }
506      
507      /**
508       * Get a handle to the Cluster
509       */
510      public Cluster getClusterHandle() {
511        return cluster;
512      }
513      
514      /**
515       * Submit a job to the MR system.
516       * 
517       * This returns a handle to the {@link RunningJob} which can be used to track
518       * the running-job.
519       * 
520       * @param jobFile the job configuration.
521       * @return a handle to the {@link RunningJob} which can be used to track the
522       *         running-job.
523       * @throws FileNotFoundException
524       * @throws InvalidJobConfException
525       * @throws IOException
526       */
527      public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
528                                                         InvalidJobConfException, 
529                                                         IOException {
530        // Load in the submitted job details
531        JobConf job = new JobConf(jobFile);
532        return submitJob(job);
533      }
534        
535      /**
536       * Submit a job to the MR system.
537       * This returns a handle to the {@link RunningJob} which can be used to track
538       * the running-job.
539       * 
540       * @param conf the job configuration.
541       * @return a handle to the {@link RunningJob} which can be used to track the
542       *         running-job.
543       * @throws FileNotFoundException
544       * @throws IOException
545       */
546      public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
547                                                      IOException {
548        return submitJobInternal(conf);
549      }
550    
551      @InterfaceAudience.Private
552      public RunningJob submitJobInternal(final JobConf conf)
553          throws FileNotFoundException, IOException {
554        try {
555          conf.setBooleanIfUnset("mapred.mapper.new-api", false);
556          conf.setBooleanIfUnset("mapred.reducer.new-api", false);
557          Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
558            @Override
559            public Job run() throws IOException, ClassNotFoundException, 
560              InterruptedException {
561              Job job = Job.getInstance(conf);
562              job.submit();
563              return job;
564            }
565          });
566          // update our Cluster instance with the one created by Job for submission
567          // (we can't pass our Cluster instance to Job, since Job wraps the config
568          // instance, and the two configs would then diverge)
569          cluster = job.getCluster();
570          return new NetworkedJob(job);
571        } catch (InterruptedException ie) {
572          throw new IOException("interrupted", ie);
573        }
574      }
575    
576      private Job getJobUsingCluster(final JobID jobid) throws IOException,
577      InterruptedException {
578        return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
579          public Job run() throws IOException, InterruptedException  {
580           return cluster.getJob(jobid);
581          }
582        });
583      }
584      /**
585       * Get an {@link RunningJob} object to track an ongoing job.  Returns
586       * null if the id does not correspond to any known job.
587       * 
588       * @param jobid the jobid of the job.
589       * @return the {@link RunningJob} handle to track the job, null if the 
590       *         <code>jobid</code> doesn't correspond to any known job.
591       * @throws IOException
592       */
593      public RunningJob getJob(final JobID jobid) throws IOException {
594        try {
595          
596          Job job = getJobUsingCluster(jobid);
597          if (job != null) {
598            JobStatus status = JobStatus.downgrade(job.getStatus());
599            if (status != null) {
600              return new NetworkedJob(status, cluster,
601                  new JobConf(job.getConfiguration()));
602            } 
603          }
604        } catch (InterruptedException ie) {
605          throw new IOException(ie);
606        }
607        return null;
608      }
609    
610      /**@deprecated Applications should rather use {@link #getJob(JobID)}. 
611       */
612      @Deprecated
613      public RunningJob getJob(String jobid) throws IOException {
614        return getJob(JobID.forName(jobid));
615      }
616      
617      private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
618      
619      /**
620       * Get the information of the current state of the map tasks of a job.
621       * 
622       * @param jobId the job to query.
623       * @return the list of all of the map tips.
624       * @throws IOException
625       */
626      public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
627        return getTaskReports(jobId, TaskType.MAP);
628      }
629      
630      private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 
631        IOException {
632        try {
633          Job j = getJobUsingCluster(jobId);
634          if(j == null) {
635            return EMPTY_TASK_REPORTS;
636          }
637          return TaskReport.downgradeArray(j.getTaskReports(type));
638        } catch (InterruptedException ie) {
639          throw new IOException(ie);
640        }
641      }
642      
643      /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
644      @Deprecated
645      public TaskReport[] getMapTaskReports(String jobId) throws IOException {
646        return getMapTaskReports(JobID.forName(jobId));
647      }
648      
649      /**
650       * Get the information of the current state of the reduce tasks of a job.
651       * 
652       * @param jobId the job to query.
653       * @return the list of all of the reduce tips.
654       * @throws IOException
655       */    
656      public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
657        return getTaskReports(jobId, TaskType.REDUCE);
658      }
659    
660      /**
661       * Get the information of the current state of the cleanup tasks of a job.
662       * 
663       * @param jobId the job to query.
664       * @return the list of all of the cleanup tips.
665       * @throws IOException
666       */    
667      public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
668        return getTaskReports(jobId, TaskType.JOB_CLEANUP);
669      }
670    
671      /**
672       * Get the information of the current state of the setup tasks of a job.
673       * 
674       * @param jobId the job to query.
675       * @return the list of all of the setup tips.
676       * @throws IOException
677       */    
678      public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
679        return getTaskReports(jobId, TaskType.JOB_SETUP);
680      }
681    
682      
683      /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
684      @Deprecated
685      public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
686        return getReduceTaskReports(JobID.forName(jobId));
687      }
688      
689      /**
690       * Display the information about a job's tasks, of a particular type and
691       * in a particular state
692       * 
693       * @param jobId the ID of the job
694       * @param type the type of the task (map/reduce/setup/cleanup)
695       * @param state the state of the task 
696       * (pending/running/completed/failed/killed)
697       */
698      public void displayTasks(final JobID jobId, String type, String state) 
699      throws IOException {
700        try {
701          Job job = getJobUsingCluster(jobId);
702          super.displayTasks(job, type, state);
703        } catch (InterruptedException ie) {
704          throw new IOException(ie);
705        }
706      }
707      
708      /**
709       * Get status information about the Map-Reduce cluster.
710       *  
711       * @return the status information about the Map-Reduce cluster as an object
712       *         of {@link ClusterStatus}.
713       * @throws IOException
714       */
715      public ClusterStatus getClusterStatus() throws IOException {
716        try {
717          return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
718            public ClusterStatus run() throws IOException, InterruptedException {
719              ClusterMetrics metrics = cluster.getClusterStatus();
720              return new ClusterStatus(metrics.getTaskTrackerCount(), metrics
721                .getBlackListedTaskTrackerCount(), cluster
722                .getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
723                metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
724                metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(),
725                metrics.getDecommissionedTaskTrackerCount(), metrics
726                  .getGrayListedTaskTrackerCount());
727            }
728          });
729        } catch (InterruptedException ie) {
730          throw new IOException(ie);
731        }
732      }
733    
734      private  Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
735        Collection<String> list = new ArrayList<String>();
736        for (TaskTrackerInfo info: objs) {
737          list.add(info.getTaskTrackerName());
738        }
739        return list;
740      }
741    
742      private  Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
743        Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
744        for (TaskTrackerInfo info: objs) {
745          BlackListInfo binfo = new BlackListInfo();
746          binfo.setTrackerName(info.getTaskTrackerName());
747          binfo.setReasonForBlackListing(info.getReasonForBlacklist());
748          binfo.setBlackListReport(info.getBlacklistReport());
749          list.add(binfo);
750        }
751        return list;
752      }
753    
754      /**
755       * Get status information about the Map-Reduce cluster.
756       *  
757       * @param  detailed if true then get a detailed status including the
758       *         tracker names
759       * @return the status information about the Map-Reduce cluster as an object
760       *         of {@link ClusterStatus}.
761       * @throws IOException
762       */
763      public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
764        try {
765          return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
766            public ClusterStatus run() throws IOException, InterruptedException {
767            ClusterMetrics metrics = cluster.getClusterStatus();
768            return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
769              arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
770              cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
771              metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
772              metrics.getReduceSlotCapacity(), 
773              cluster.getJobTrackerStatus());
774            }
775          });
776        } catch (InterruptedException ie) {
777          throw new IOException(ie);
778        }
779      }
780        
781    
782      /** 
783       * Get the jobs that are not completed and not failed.
784       * 
785       * @return array of {@link JobStatus} for the running/to-be-run jobs.
786       * @throws IOException
787       */
788      public JobStatus[] jobsToComplete() throws IOException {
789        List<JobStatus> stats = new ArrayList<JobStatus>();
790        for (JobStatus stat : getAllJobs()) {
791          if (!stat.isJobComplete()) {
792            stats.add(stat);
793          }
794        }
795        return stats.toArray(new JobStatus[0]);
796      }
797    
798      /** 
799       * Get the jobs that are submitted.
800       * 
801       * @return array of {@link JobStatus} for the submitted jobs.
802       * @throws IOException
803       */
804      public JobStatus[] getAllJobs() throws IOException {
805        try {
806          org.apache.hadoop.mapreduce.JobStatus[] jobs = 
807              clientUgi.doAs(new PrivilegedExceptionAction<
808                  org.apache.hadoop.mapreduce.JobStatus[]> () {
809                public org.apache.hadoop.mapreduce.JobStatus[] run() 
810                    throws IOException, InterruptedException {
811                  return cluster.getAllJobStatuses();
812                }
813              });
814          JobStatus[] stats = new JobStatus[jobs.length];
815          for (int i = 0; i < jobs.length; i++) {
816            stats[i] = JobStatus.downgrade(jobs[i]);
817          }
818          return stats;
819        } catch (InterruptedException ie) {
820          throw new IOException(ie);
821        }
822      }
823      
824      /** 
825       * Utility that submits a job, then polls for progress until the job is
826       * complete.
827       * 
828       * @param job the job configuration.
829       * @throws IOException if the job fails
830       */
831      public static RunningJob runJob(JobConf job) throws IOException {
832        JobClient jc = new JobClient(job);
833        RunningJob rj = jc.submitJob(job);
834        try {
835          if (!jc.monitorAndPrintJob(job, rj)) {
836            throw new IOException("Job failed!");
837          }
838        } catch (InterruptedException ie) {
839          Thread.currentThread().interrupt();
840        }
841        return rj;
842      }
843      
844      /**
845       * Monitor a job and print status in real-time as progress is made and tasks 
846       * fail.
847       * @param conf the job's configuration
848       * @param job the job to track
849       * @return true if the job succeeded
850       * @throws IOException if communication to the JobTracker fails
851       */
852      public boolean monitorAndPrintJob(JobConf conf, 
853                                        RunningJob job
854      ) throws IOException, InterruptedException {
855        return ((NetworkedJob)job).monitorAndPrintJob();
856      }
857    
858      static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
859        return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
860      }
861      
862      static Configuration getConfiguration(String jobTrackerSpec)
863      {
864        Configuration conf = new Configuration();
865        if (jobTrackerSpec != null) {        
866          if (jobTrackerSpec.indexOf(":") >= 0) {
867            conf.set("mapred.job.tracker", jobTrackerSpec);
868          } else {
869            String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
870            URL validate = conf.getResource(classpathFile);
871            if (validate == null) {
872              throw new RuntimeException(classpathFile + " not found on CLASSPATH");
873            }
874            conf.addResource(classpathFile);
875          }
876        }
877        return conf;
878      }
879    
880      /**
881       * Sets the output filter for tasks. only those tasks are printed whose
882       * output matches the filter. 
883       * @param newValue task filter.
884       */
885      @Deprecated
886      public void setTaskOutputFilter(TaskStatusFilter newValue){
887        this.taskOutputFilter = newValue;
888      }
889        
890      /**
891       * Get the task output filter out of the JobConf.
892       * 
893       * @param job the JobConf to examine.
894       * @return the filter level.
895       */
896      public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
897        return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
898                                                "FAILED"));
899      }
900        
901      /**
902       * Modify the JobConf to set the task output filter.
903       * 
904       * @param job the JobConf to modify.
905       * @param newValue the value to set.
906       */
907      public static void setTaskOutputFilter(JobConf job, 
908                                             TaskStatusFilter newValue) {
909        job.set("jobclient.output.filter", newValue.toString());
910      }
911        
912      /**
913       * Returns task output filter.
914       * @return task filter. 
915       */
916      @Deprecated
917      public TaskStatusFilter getTaskOutputFilter(){
918        return this.taskOutputFilter; 
919      }
920    
921      protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
922          String counterGroupName, String counterName) throws IOException {
923        Counters counters = Counters.downgrade(cntrs);
924        return counters.findCounter(counterGroupName, counterName).getValue();
925      }
926    
927      /**
928       * Get status information about the max available Maps in the cluster.
929       *  
930       * @return the max available Maps in the cluster
931       * @throws IOException
932       */
933      public int getDefaultMaps() throws IOException {
934        try {
935          return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
936            @Override
937            public Integer run() throws IOException, InterruptedException {
938              return cluster.getClusterStatus().getMapSlotCapacity();
939            }
940          });
941        } catch (InterruptedException ie) {
942          throw new IOException(ie);
943        }
944      }
945    
946      /**
947       * Get status information about the max available Reduces in the cluster.
948       *  
949       * @return the max available Reduces in the cluster
950       * @throws IOException
951       */
952      public int getDefaultReduces() throws IOException {
953        try {
954          return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
955            @Override
956            public Integer run() throws IOException, InterruptedException {
957              return cluster.getClusterStatus().getReduceSlotCapacity();
958            }
959          });
960        } catch (InterruptedException ie) {
961          throw new IOException(ie);
962        }
963      }
964    
965      /**
966       * Grab the jobtracker system directory path where job-specific files are to be placed.
967       * 
968       * @return the system directory where job-specific files are to be placed.
969       */
970      public Path getSystemDir() {
971        try {
972          return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
973            @Override
974            public Path run() throws IOException, InterruptedException {
975              return cluster.getSystemDir();
976            }
977          });
978          } catch (IOException ioe) {
979          return null;
980        } catch (InterruptedException ie) {
981          return null;
982        }
983      }
984    
985      /**
986       * Checks if the job directory is clean and has all the required components
987       * for (re) starting the job
988       */
989      public static boolean isJobDirValid(Path jobDirPath, FileSystem fs)
990          throws IOException {
991        FileStatus[] contents = fs.listStatus(jobDirPath);
992        int matchCount = 0;
993        if (contents != null && contents.length >= 2) {
994          for (FileStatus status : contents) {
995            if ("job.xml".equals(status.getPath().getName())) {
996              ++matchCount;
997            }
998            if ("job.split".equals(status.getPath().getName())) {
999              ++matchCount;
1000            }
1001          }
1002          if (matchCount == 2) {
1003            return true;
1004          }
1005        }
1006        return false;
1007      }
1008    
1009      /**
1010       * Fetch the staging area directory for the application
1011       * 
1012       * @return path to staging area directory
1013       * @throws IOException
1014       */
1015      public Path getStagingAreaDir() throws IOException {
1016        try {
1017          return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
1018            @Override
1019            public Path run() throws IOException, InterruptedException {
1020              return cluster.getStagingAreaDir();
1021            }
1022          });
1023        } catch (InterruptedException ie) {
1024          // throw RuntimeException instead for compatibility reasons
1025          throw new RuntimeException(ie);
1026        }
1027      }
1028    
1029      private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
1030        JobQueueInfo ret = new JobQueueInfo(queue);
1031        // make sure to convert any children
1032        if (queue.getQueueChildren().size() > 0) {
1033          List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
1034              .getQueueChildren().size());
1035          for (QueueInfo child : queue.getQueueChildren()) {
1036            childQueues.add(getJobQueueInfo(child));
1037          }
1038          ret.setChildren(childQueues);
1039        }
1040        return ret;
1041      }
1042    
1043      private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
1044          throws IOException {
1045        JobQueueInfo[] ret = new JobQueueInfo[queues.length];
1046        for (int i = 0; i < queues.length; i++) {
1047          ret[i] = getJobQueueInfo(queues[i]);
1048        }
1049        return ret;
1050      }
1051    
1052      /**
1053       * Returns an array of queue information objects about root level queues
1054       * configured
1055       *
1056       * @return the array of root level JobQueueInfo objects
1057       * @throws IOException
1058       */
1059      public JobQueueInfo[] getRootQueues() throws IOException {
1060        try {
1061          return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1062            public JobQueueInfo[] run() throws IOException, InterruptedException {
1063              return getJobQueueInfoArray(cluster.getRootQueues());
1064            }
1065          });
1066        } catch (InterruptedException ie) {
1067          throw new IOException(ie);
1068        }
1069      }
1070    
1071      /**
1072       * Returns an array of queue information objects about immediate children
1073       * of queue queueName.
1074       * 
1075       * @param queueName
1076       * @return the array of immediate children JobQueueInfo objects
1077       * @throws IOException
1078       */
1079      public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
1080        try {
1081          return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1082            public JobQueueInfo[] run() throws IOException, InterruptedException {
1083              return getJobQueueInfoArray(cluster.getChildQueues(queueName));
1084            }
1085          });
1086        } catch (InterruptedException ie) {
1087          throw new IOException(ie);
1088        }
1089      }
1090      
1091      /**
1092       * Return an array of queue information objects about all the Job Queues
1093       * configured.
1094       * 
1095       * @return Array of JobQueueInfo objects
1096       * @throws IOException
1097       */
1098      public JobQueueInfo[] getQueues() throws IOException {
1099        try {
1100          return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1101            public JobQueueInfo[] run() throws IOException, InterruptedException {
1102              return getJobQueueInfoArray(cluster.getQueues());
1103            }
1104          });
1105        } catch (InterruptedException ie) {
1106          throw new IOException(ie);
1107        }
1108      }
1109      
1110      /**
1111       * Gets all the jobs which were added to particular Job Queue
1112       * 
1113       * @param queueName name of the Job Queue
1114       * @return Array of jobs present in the job queue
1115       * @throws IOException
1116       */
1117      
1118      public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
1119        try {
1120          QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
1121            @Override
1122            public QueueInfo run() throws IOException, InterruptedException {
1123              return cluster.getQueue(queueName);
1124            }
1125          });
1126          if (queue == null) {
1127            return null;
1128          }
1129          org.apache.hadoop.mapreduce.JobStatus[] stats = 
1130            queue.getJobStatuses();
1131          JobStatus[] ret = new JobStatus[stats.length];
1132          for (int i = 0 ; i < stats.length; i++ ) {
1133            ret[i] = JobStatus.downgrade(stats[i]);
1134          }
1135          return ret;
1136        } catch (InterruptedException ie) {
1137          throw new IOException(ie);
1138        }
1139      }
1140      
1141      /**
1142       * Gets the queue information associated to a particular Job Queue
1143       * 
1144       * @param queueName name of the job queue.
1145       * @return Queue information associated to particular queue.
1146       * @throws IOException
1147       */
1148      public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
1149        try {
1150          QueueInfo queueInfo = clientUgi.doAs(new 
1151              PrivilegedExceptionAction<QueueInfo>() {
1152            public QueueInfo run() throws IOException, InterruptedException {
1153              return cluster.getQueue(queueName);
1154            }
1155          });
1156          if (queueInfo != null) {
1157            return new JobQueueInfo(queueInfo);
1158          }
1159          return null;
1160        } catch (InterruptedException ie) {
1161          throw new IOException(ie);
1162        }
1163      }
1164      
1165      /**
1166       * Gets the Queue ACLs for current user
1167       * @return array of QueueAclsInfo object for current user.
1168       * @throws IOException
1169       */
1170      public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
1171        try {
1172          org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 
1173            clientUgi.doAs(new 
1174                PrivilegedExceptionAction
1175                <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
1176                  public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 
1177                  throws IOException, InterruptedException {
1178                    return cluster.getQueueAclsForCurrentUser();
1179                  }
1180            });
1181          QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
1182          for (int i = 0 ; i < acls.length; i++ ) {
1183            ret[i] = QueueAclsInfo.downgrade(acls[i]);
1184          }
1185          return ret;
1186        } catch (InterruptedException ie) {
1187          throw new IOException(ie);
1188        }
1189      }
1190    
1191      /**
1192       * Get a delegation token for the user from the JobTracker.
1193       * @param renewer the user who can renew the token
1194       * @return the new token
1195       * @throws IOException
1196       */
1197      public Token<DelegationTokenIdentifier> 
1198        getDelegationToken(final Text renewer) throws IOException, InterruptedException {
1199        return clientUgi.doAs(new 
1200            PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
1201          public Token<DelegationTokenIdentifier> run() throws IOException, 
1202          InterruptedException {
1203            return cluster.getDelegationToken(renewer);
1204          }
1205        });
1206      }
1207    
1208      /**
1209       * Renew a delegation token
1210       * @param token the token to renew
1211       * @return true if the renewal went well
1212       * @throws InvalidToken
1213       * @throws IOException
1214       * @deprecated Use {@link Token#renew} instead
1215       */
1216      public long renewDelegationToken(Token<DelegationTokenIdentifier> token
1217                                       ) throws InvalidToken, IOException, 
1218                                                InterruptedException {
1219        return token.renew(getConf());
1220      }
1221    
1222      /**
1223       * Cancel a delegation token from the JobTracker
1224       * @param token the token to cancel
1225       * @throws IOException
1226       * @deprecated Use {@link Token#cancel} instead
1227       */
1228      public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
1229                                        ) throws InvalidToken, IOException, 
1230                                                 InterruptedException {
1231        token.cancel(getConf());
1232      }
1233    
1234      /**
1235       */
1236      public static void main(String argv[]) throws Exception {
1237        int res = ToolRunner.run(new JobClient(), argv);
1238        System.exit(res);
1239      }
1240    }
1241