001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce;
020    
021    import java.io.IOException;
022    import java.net.URI;
023    import java.security.PrivilegedExceptionAction;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.classification.InterfaceAudience.Private;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.conf.Configuration.IntegerRanges;
032    import org.apache.hadoop.fs.FileSystem;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.io.RawComparator;
035    import org.apache.hadoop.mapred.JobConf;
036    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037    import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038    import org.apache.hadoop.mapreduce.task.JobContextImpl;
039    import org.apache.hadoop.mapreduce.util.ConfigUtil;
040    import org.apache.hadoop.util.StringUtils;
041    
042    /**
043     * The job submitter's view of the Job.
044     * 
045     * <p>It allows the user to configure the
046     * job, submit it, control its execution, and query the state. The set methods
047     * only work until the job is submitted, afterwards they will throw an 
048     * IllegalStateException. </p>
049     * 
050     * <p>
051     * Normally the user creates the application, describes various facets of the
052     * job via {@link Job} and then submits the job and monitor its progress.</p>
053     * 
054     * <p>Here is an example on how to submit a job:</p>
055     * <p><blockquote><pre>
056     *     // Create a new Job
057     *     Job job = new Job(new Configuration());
058     *     job.setJarByClass(MyJob.class);
059     *     
060     *     // Specify various job-specific parameters     
061     *     job.setJobName("myjob");
062     *     
063     *     job.setInputPath(new Path("in"));
064     *     job.setOutputPath(new Path("out"));
065     *     
066     *     job.setMapperClass(MyJob.MyMapper.class);
067     *     job.setReducerClass(MyJob.MyReducer.class);
068     *
069     *     // Submit the job, then poll for progress until the job is complete
070     *     job.waitForCompletion(true);
071     * </pre></blockquote></p>
072     * 
073     * 
074     */
075    @InterfaceAudience.Public
076    @InterfaceStability.Evolving
077    public class Job extends JobContextImpl implements JobContext {  
078      private static final Log LOG = LogFactory.getLog(Job.class);
079    
080      @InterfaceStability.Evolving
081      public static enum JobState {DEFINE, RUNNING};
082      private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
083      public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
084      /** Key in mapred-*.xml that sets completionPollInvervalMillis */
085      public static final String COMPLETION_POLL_INTERVAL_KEY = 
086        "mapreduce.client.completion.pollinterval";
087      
088      /** Default completionPollIntervalMillis is 5000 ms. */
089      static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
090      /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
091      public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
092        "mapreduce.client.progressmonitor.pollinterval";
093      /** Default progMonitorPollIntervalMillis is 1000 ms. */
094      static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
095    
096      public static final String USED_GENERIC_PARSER = 
097        "mapreduce.client.genericoptionsparser.used";
098      public static final String SUBMIT_REPLICATION = 
099        "mapreduce.client.submit.file.replication";
100      private static final String TASKLOG_PULL_TIMEOUT_KEY =
101               "mapreduce.client.tasklog.timeout";
102      private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
103    
104      @InterfaceStability.Evolving
105      public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
106    
107      static {
108        ConfigUtil.loadResources();
109      }
110    
111      private JobState state = JobState.DEFINE;
112      private JobStatus status;
113      private long statustime;
114      private Cluster cluster;
115    
116      @Deprecated
117      public Job() throws IOException {
118        this(new Configuration());
119      }
120    
121      @Deprecated
122      public Job(Configuration conf) throws IOException {
123        this(new JobConf(conf));
124      }
125    
126      @Deprecated
127      public Job(Configuration conf, String jobName) throws IOException {
128        this(conf);
129        setJobName(jobName);
130      }
131    
132      Job(JobConf conf) throws IOException {
133        super(conf, null);
134        // propagate existing user credentials to job
135        this.credentials.mergeAll(this.ugi.getCredentials());
136        this.cluster = null;
137      }
138    
139      Job(JobStatus status, JobConf conf) throws IOException {
140        this(conf);
141        setJobID(status.getJobID());
142        this.status = status;
143        state = JobState.RUNNING;
144      }
145    
146          
147      /**
148       * Creates a new {@link Job} with no particular {@link Cluster} .
149       * A Cluster will be created with a generic {@link Configuration}.
150       * 
151       * @return the {@link Job} , with no connection to a cluster yet.
152       * @throws IOException
153       */
154      public static Job getInstance() throws IOException {
155        // create with a null Cluster
156        return getInstance(new Configuration());
157      }
158          
159      /**
160       * Creates a new {@link Job} with no particular {@link Cluster} and a 
161       * given {@link Configuration}.
162       * 
163       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
164       * that any necessary internal modifications do not reflect on the incoming 
165       * parameter.
166       * 
167       * A Cluster will be created from the conf parameter only when it's needed.
168       * 
169       * @param conf the configuration
170       * @return the {@link Job} , with no connection to a cluster yet.
171       * @throws IOException
172       */
173      public static Job getInstance(Configuration conf) throws IOException {
174        // create with a null Cluster
175        JobConf jobConf = new JobConf(conf);
176        return new Job(jobConf);
177      }
178    
179          
180      /**
181       * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
182       * A Cluster will be created from the conf parameter only when it's needed.
183       *
184       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
185       * that any necessary internal modifications do not reflect on the incoming 
186       * parameter.
187       * 
188       * @param conf the configuration
189       * @return the {@link Job} , with no connection to a cluster yet.
190       * @throws IOException
191       */
192      public static Job getInstance(Configuration conf, String jobName)
193               throws IOException {
194        // create with a null Cluster
195        Job result = getInstance(conf);
196        result.setJobName(jobName);
197        return result;
198      }
199      
200      /**
201       * Creates a new {@link Job} with no particular {@link Cluster} and given
202       * {@link Configuration} and {@link JobStatus}.
203       * A Cluster will be created from the conf parameter only when it's needed.
204       * 
205       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
206       * that any necessary internal modifications do not reflect on the incoming 
207       * parameter.
208       * 
209       * @param status job status
210       * @param conf job configuration
211       * @return the {@link Job} , with no connection to a cluster yet.
212       * @throws IOException
213       */
214      public static Job getInstance(JobStatus status, Configuration conf) 
215      throws IOException {
216        return new Job(status, new JobConf(conf));
217      }
218    
219      /**
220       * Creates a new {@link Job} with no particular {@link Cluster}.
221       * A Cluster will be created from the conf parameter only when it's needed.
222       *
223       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
224       * that any necessary internal modifications do not reflect on the incoming 
225       * parameter.
226       * 
227       * @param ignored
228       * @return the {@link Job} , with no connection to a cluster yet.
229       * @throws IOException
230       * @deprecated Use {@link #getInstance()}
231       */
232      @Deprecated
233      public static Job getInstance(Cluster ignored) throws IOException {
234        return getInstance();
235      }
236      
237      /**
238       * Creates a new {@link Job} with no particular {@link Cluster} and given
239       * {@link Configuration}.
240       * A Cluster will be created from the conf parameter only when it's needed.
241       * 
242       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
243       * that any necessary internal modifications do not reflect on the incoming 
244       * parameter.
245       * 
246       * @param ignored
247       * @param conf job configuration
248       * @return the {@link Job} , with no connection to a cluster yet.
249       * @throws IOException
250       * @deprecated Use {@link #getInstance(Configuration)}
251       */
252      @Deprecated
253      public static Job getInstance(Cluster ignored, Configuration conf) 
254          throws IOException {
255        return getInstance(conf);
256      }
257      
258      /**
259       * Creates a new {@link Job} with no particular {@link Cluster} and given
260       * {@link Configuration} and {@link JobStatus}.
261       * A Cluster will be created from the conf parameter only when it's needed.
262       * 
263       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
264       * that any necessary internal modifications do not reflect on the incoming 
265       * parameter.
266       * 
267       * @param cluster cluster
268       * @param status job status
269       * @param conf job configuration
270       * @return the {@link Job} , with no connection to a cluster yet.
271       * @throws IOException
272       */
273      @Private
274      public static Job getInstance(Cluster cluster, JobStatus status, 
275          Configuration conf) throws IOException {
276        Job job = getInstance(status, conf);
277        job.setCluster(cluster);
278        return job;
279      }
280    
281      private void ensureState(JobState state) throws IllegalStateException {
282        if (state != this.state) {
283          throw new IllegalStateException("Job in state "+ this.state + 
284                                          " instead of " + state);
285        }
286    
287        if (state == JobState.RUNNING && cluster == null) {
288          throw new IllegalStateException
289            ("Job in state " + this.state
290             + ", but it isn't attached to any job tracker!");
291        }
292      }
293    
294      /**
295       * Some methods rely on having a recent job status object.  Refresh
296       * it, if necessary
297       */
298      synchronized void ensureFreshStatus() 
299          throws IOException {
300        if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
301          updateStatus();
302        }
303      }
304        
305      /** Some methods need to update status immediately. So, refresh
306       * immediately
307       * @throws IOException
308       */
309      synchronized void updateStatus() throws IOException {
310        try {
311          this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
312            @Override
313            public JobStatus run() throws IOException, InterruptedException {
314              return cluster.getClient().getJobStatus(status.getJobID());
315            }
316          });
317        }
318        catch (InterruptedException ie) {
319          throw new IOException(ie);
320        }
321        if (this.status == null) {
322          throw new IOException("Job status not available ");
323        }
324        this.statustime = System.currentTimeMillis();
325      }
326      
327      public JobStatus getStatus() throws IOException, InterruptedException {
328        ensureState(JobState.RUNNING);
329        updateStatus();
330        return status;
331      }
332      
333      private void setStatus(JobStatus status) {
334        this.status = status;
335      }
336    
337      /**
338       * Returns the current state of the Job.
339       * 
340       * @return JobStatus#State
341       * @throws IOException
342       * @throws InterruptedException
343       */
344      public JobStatus.State getJobState() 
345          throws IOException, InterruptedException {
346        ensureState(JobState.RUNNING);
347        updateStatus();
348        return status.getState();
349      }
350      
351      /**
352       * Get the URL where some job progress information will be displayed.
353       * 
354       * @return the URL where some job progress information will be displayed.
355       */
356      public String getTrackingURL(){
357        ensureState(JobState.RUNNING);
358        return status.getTrackingUrl().toString();
359      }
360    
361      /**
362       * Get the path of the submitted job configuration.
363       * 
364       * @return the path of the submitted job configuration.
365       */
366      public String getJobFile() {
367        ensureState(JobState.RUNNING);
368        return status.getJobFile();
369      }
370    
371      /**
372       * Get start time of the job.
373       * 
374       * @return the start time of the job
375       */
376      public long getStartTime() {
377        ensureState(JobState.RUNNING);
378        return status.getStartTime();
379      }
380    
381      /**
382       * Get finish time of the job.
383       * 
384       * @return the finish time of the job
385       */
386      public long getFinishTime() throws IOException, InterruptedException {
387        ensureState(JobState.RUNNING);
388        updateStatus();
389        return status.getFinishTime();
390      }
391    
392      /**
393       * Get scheduling info of the job.
394       * 
395       * @return the scheduling info of the job
396       */
397      public String getSchedulingInfo() {
398        ensureState(JobState.RUNNING);
399        return status.getSchedulingInfo();
400      }
401    
402      /**
403       * Get scheduling info of the job.
404       * 
405       * @return the scheduling info of the job
406       */
407      public JobPriority getPriority() throws IOException, InterruptedException {
408        ensureState(JobState.RUNNING);
409        updateStatus();
410        return status.getPriority();
411      }
412    
413      /**
414       * The user-specified job name.
415       */
416      public String getJobName() {
417        if (state == JobState.DEFINE) {
418          return super.getJobName();
419        }
420        ensureState(JobState.RUNNING);
421        return status.getJobName();
422      }
423    
424      public String getHistoryUrl() throws IOException, InterruptedException {
425        ensureState(JobState.RUNNING);
426        updateStatus();
427        return status.getHistoryFile();
428      }
429    
430      public boolean isRetired() throws IOException, InterruptedException {
431        ensureState(JobState.RUNNING);
432        updateStatus();
433        return status.isRetired();
434      }
435      
436      @Private
437      public Cluster getCluster() {
438        return cluster;
439      }
440    
441      /** Only for mocks in unit tests. */
442      @Private
443      private void setCluster(Cluster cluster) {
444        this.cluster = cluster;
445      }
446    
447      /**
448       * Dump stats to screen.
449       */
450      @Override
451      public String toString() {
452        ensureState(JobState.RUNNING);
453        String reasonforFailure = " ";
454        int numMaps = 0;
455        int numReduces = 0;
456        try {
457          updateStatus();
458          if (status.getState().equals(JobStatus.State.FAILED))
459            reasonforFailure = getTaskFailureEventString();
460          numMaps = getTaskReports(TaskType.MAP).length;
461          numReduces = getTaskReports(TaskType.REDUCE).length;
462        } catch (IOException e) {
463        } catch (InterruptedException ie) {
464        }
465        StringBuffer sb = new StringBuffer();
466        sb.append("Job: ").append(status.getJobID()).append("\n");
467        sb.append("Job File: ").append(status.getJobFile()).append("\n");
468        sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
469        sb.append("\n");
470        sb.append("Uber job : ").append(status.isUber()).append("\n");
471        sb.append("Number of maps: ").append(numMaps).append("\n");
472        sb.append("Number of reduces: ").append(numReduces).append("\n");
473        sb.append("map() completion: ");
474        sb.append(status.getMapProgress()).append("\n");
475        sb.append("reduce() completion: ");
476        sb.append(status.getReduceProgress()).append("\n");
477        sb.append("Job state: ");
478        sb.append(status.getState()).append("\n");
479        sb.append("retired: ").append(status.isRetired()).append("\n");
480        sb.append("reason for failure: ").append(reasonforFailure);
481        return sb.toString();
482      }
483    
484      /**
485       * @return taskid which caused job failure
486       * @throws IOException
487       * @throws InterruptedException
488       */
489      String getTaskFailureEventString() throws IOException,
490          InterruptedException {
491        int failCount = 1;
492        TaskCompletionEvent lastEvent = null;
493        TaskCompletionEvent[] events = ugi.doAs(new 
494            PrivilegedExceptionAction<TaskCompletionEvent[]>() {
495              @Override
496              public TaskCompletionEvent[] run() throws IOException,
497              InterruptedException {
498                return cluster.getClient().getTaskCompletionEvents(
499                    status.getJobID(), 0, 10);
500              }
501            });
502        for (TaskCompletionEvent event : events) {
503          if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
504            failCount++;
505            lastEvent = event;
506          }
507        }
508        if (lastEvent == null) {
509          return "There are no failed tasks for the job. "
510              + "Job is failed due to some other reason and reason "
511              + "can be found in the logs.";
512        }
513        String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
514        String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
515        return (" task " + taskID + " failed " +
516          failCount + " times " + "For details check tasktracker at: " +
517          lastEvent.getTaskTrackerHttp());
518      }
519    
520      /**
521       * Get the information of the current state of the tasks of a job.
522       * 
523       * @param type Type of the task
524       * @return the list of all of the map tips.
525       * @throws IOException
526       */
527      public TaskReport[] getTaskReports(TaskType type) 
528          throws IOException, InterruptedException {
529        ensureState(JobState.RUNNING);
530        final TaskType tmpType = type;
531        return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
532          public TaskReport[] run() throws IOException, InterruptedException {
533            return cluster.getClient().getTaskReports(getJobID(), tmpType);
534          }
535        });
536      }
537    
538      /**
539       * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
540       * and 1.0.  When all map tasks have completed, the function returns 1.0.
541       * 
542       * @return the progress of the job's map-tasks.
543       * @throws IOException
544       */
545      public float mapProgress() throws IOException {
546        ensureState(JobState.RUNNING);
547        ensureFreshStatus();
548        return status.getMapProgress();
549      }
550    
551      /**
552       * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
553       * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
554       * 
555       * @return the progress of the job's reduce-tasks.
556       * @throws IOException
557       */
558      public float reduceProgress() throws IOException {
559        ensureState(JobState.RUNNING);
560        ensureFreshStatus();
561        return status.getReduceProgress();
562      }
563    
564      /**
565       * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
566       * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
567       * 
568       * @return the progress of the job's cleanup-tasks.
569       * @throws IOException
570       */
571      public float cleanupProgress() throws IOException, InterruptedException {
572        ensureState(JobState.RUNNING);
573        ensureFreshStatus();
574        return status.getCleanupProgress();
575      }
576    
577      /**
578       * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
579       * and 1.0.  When all setup tasks have completed, the function returns 1.0.
580       * 
581       * @return the progress of the job's setup-tasks.
582       * @throws IOException
583       */
584      public float setupProgress() throws IOException {
585        ensureState(JobState.RUNNING);
586        ensureFreshStatus();
587        return status.getSetupProgress();
588      }
589    
590      /**
591       * Check if the job is finished or not. 
592       * This is a non-blocking call.
593       * 
594       * @return <code>true</code> if the job is complete, else <code>false</code>.
595       * @throws IOException
596       */
597      public boolean isComplete() throws IOException {
598        ensureState(JobState.RUNNING);
599        updateStatus();
600        return status.isJobComplete();
601      }
602    
603      /**
604       * Check if the job completed successfully. 
605       * 
606       * @return <code>true</code> if the job succeeded, else <code>false</code>.
607       * @throws IOException
608       */
609      public boolean isSuccessful() throws IOException {
610        ensureState(JobState.RUNNING);
611        updateStatus();
612        return status.getState() == JobStatus.State.SUCCEEDED;
613      }
614    
615      /**
616       * Kill the running job.  Blocks until all job tasks have been
617       * killed as well.  If the job is no longer running, it simply returns.
618       * 
619       * @throws IOException
620       */
621      public void killJob() throws IOException {
622        ensureState(JobState.RUNNING);
623        try {
624          cluster.getClient().killJob(getJobID());
625        }
626        catch (InterruptedException ie) {
627          throw new IOException(ie);
628        }
629      }
630    
631      /**
632       * Set the priority of a running job.
633       * @param priority the new priority for the job.
634       * @throws IOException
635       */
636      public void setPriority(JobPriority priority) 
637          throws IOException, InterruptedException {
638        if (state == JobState.DEFINE) {
639          conf.setJobPriority(
640            org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
641        } else {
642          ensureState(JobState.RUNNING);
643          final JobPriority tmpPriority = priority;
644          ugi.doAs(new PrivilegedExceptionAction<Object>() {
645            @Override
646            public Object run() throws IOException, InterruptedException {
647              cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
648              return null;
649            }
650          });
651        }
652      }
653    
654      /**
655       * Get events indicating completion (success/failure) of component tasks.
656       *  
657       * @param startFrom index to start fetching events from
658       * @param numEvents number of events to fetch
659       * @return an array of {@link TaskCompletionEvent}s
660       * @throws IOException
661       */
662      public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
663          final int numEvents) throws IOException, InterruptedException {
664        ensureState(JobState.RUNNING);
665        return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
666          @Override
667          public TaskCompletionEvent[] run() throws IOException, InterruptedException {
668            return cluster.getClient().getTaskCompletionEvents(getJobID(),
669                startFrom, numEvents); 
670          }
671        });
672      }
673    
674      /**
675       * Get events indicating completion (success/failure) of component tasks.
676       *  
677       * @param startFrom index to start fetching events from
678       * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s
679       * @throws IOException
680       */
681      public org.apache.hadoop.mapred.TaskCompletionEvent[]
682        getTaskCompletionEvents(final int startFrom) throws IOException {
683        try {
684          TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10);
685          org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents =
686              new org.apache.hadoop.mapred.TaskCompletionEvent[events.length];
687          for (int i = 0; i < events.length; i++) {
688            retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade
689                (events[i]);
690          }
691          return retEvents;
692        } catch (InterruptedException ie) {
693          throw new IOException(ie);
694        }
695      }
696    
697      /**
698       * Kill indicated task attempt.
699       * @param taskId the id of the task to kill.
700       * @param shouldFail if <code>true</code> the task is failed and added
701       *                   to failed tasks list, otherwise it is just killed,
702       *                   w/o affecting job failure status.
703       */
704      @Private
705      public boolean killTask(final TaskAttemptID taskId,
706                              final boolean shouldFail) throws IOException {
707        ensureState(JobState.RUNNING);
708        try {
709          return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
710            public Boolean run() throws IOException, InterruptedException {
711              return cluster.getClient().killTask(taskId, shouldFail);
712            }
713          });
714        }
715        catch (InterruptedException ie) {
716          throw new IOException(ie);
717        }
718      }
719    
720      /**
721       * Kill indicated task attempt.
722       * 
723       * @param taskId the id of the task to be terminated.
724       * @throws IOException
725       */
726      public void killTask(final TaskAttemptID taskId)
727          throws IOException {
728        killTask(taskId, false);
729      }
730    
731      /**
732       * Fail indicated task attempt.
733       * 
734       * @param taskId the id of the task to be terminated.
735       * @throws IOException
736       */
737      public void failTask(final TaskAttemptID taskId)
738          throws IOException {
739        killTask(taskId, true);
740      }
741    
742      /**
743       * Gets the counters for this job. May return null if the job has been
744       * retired and the job is no longer in the completed job store.
745       * 
746       * @return the counters for this job.
747       * @throws IOException
748       */
749      public Counters getCounters() 
750          throws IOException {
751        ensureState(JobState.RUNNING);
752        try {
753          return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
754            @Override
755            public Counters run() throws IOException, InterruptedException {
756              return cluster.getClient().getJobCounters(getJobID());
757            }
758          });
759        }
760        catch (InterruptedException ie) {
761          throw new IOException(ie);
762        }
763      }
764    
765      /**
766       * Gets the diagnostic messages for a given task attempt.
767       * @param taskid
768       * @return the list of diagnostic messages for the task
769       * @throws IOException
770       */
771      public String[] getTaskDiagnostics(final TaskAttemptID taskid) 
772          throws IOException, InterruptedException {
773        ensureState(JobState.RUNNING);
774        return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
775          @Override
776          public String[] run() throws IOException, InterruptedException {
777            return cluster.getClient().getTaskDiagnostics(taskid);
778          }
779        });
780      }
781    
782      /**
783       * Set the number of reduce tasks for the job.
784       * @param tasks the number of reduce tasks
785       * @throws IllegalStateException if the job is submitted
786       */
787      public void setNumReduceTasks(int tasks) throws IllegalStateException {
788        ensureState(JobState.DEFINE);
789        conf.setNumReduceTasks(tasks);
790      }
791    
792      /**
793       * Set the current working directory for the default file system.
794       * 
795       * @param dir the new current working directory.
796       * @throws IllegalStateException if the job is submitted
797       */
798      public void setWorkingDirectory(Path dir) throws IOException {
799        ensureState(JobState.DEFINE);
800        conf.setWorkingDirectory(dir);
801      }
802    
803      /**
804       * Set the {@link InputFormat} for the job.
805       * @param cls the <code>InputFormat</code> to use
806       * @throws IllegalStateException if the job is submitted
807       */
808      public void setInputFormatClass(Class<? extends InputFormat> cls
809                                      ) throws IllegalStateException {
810        ensureState(JobState.DEFINE);
811        conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
812                      InputFormat.class);
813      }
814    
815      /**
816       * Set the {@link OutputFormat} for the job.
817       * @param cls the <code>OutputFormat</code> to use
818       * @throws IllegalStateException if the job is submitted
819       */
820      public void setOutputFormatClass(Class<? extends OutputFormat> cls
821                                       ) throws IllegalStateException {
822        ensureState(JobState.DEFINE);
823        conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
824                      OutputFormat.class);
825      }
826    
827      /**
828       * Set the {@link Mapper} for the job.
829       * @param cls the <code>Mapper</code> to use
830       * @throws IllegalStateException if the job is submitted
831       */
832      public void setMapperClass(Class<? extends Mapper> cls
833                                 ) throws IllegalStateException {
834        ensureState(JobState.DEFINE);
835        conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
836      }
837    
838      /**
839       * Set the Jar by finding where a given class came from.
840       * @param cls the example class
841       */
842      public void setJarByClass(Class<?> cls) {
843        ensureState(JobState.DEFINE);
844        conf.setJarByClass(cls);
845      }
846    
847      /**
848       * Set the job jar 
849       */
850      public void setJar(String jar) {
851        ensureState(JobState.DEFINE);
852        conf.setJar(jar);
853      }
854    
855      /**
856       * Set the reported username for this job.
857       * 
858       * @param user the username for this job.
859       */
860      public void setUser(String user) {
861        ensureState(JobState.DEFINE);
862        conf.setUser(user);
863      }
864    
865      /**
866       * Set the combiner class for the job.
867       * @param cls the combiner to use
868       * @throws IllegalStateException if the job is submitted
869       */
870      public void setCombinerClass(Class<? extends Reducer> cls
871                                   ) throws IllegalStateException {
872        ensureState(JobState.DEFINE);
873        conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
874      }
875    
876      /**
877       * Set the {@link Reducer} for the job.
878       * @param cls the <code>Reducer</code> to use
879       * @throws IllegalStateException if the job is submitted
880       */
881      public void setReducerClass(Class<? extends Reducer> cls
882                                  ) throws IllegalStateException {
883        ensureState(JobState.DEFINE);
884        conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
885      }
886    
887      /**
888       * Set the {@link Partitioner} for the job.
889       * @param cls the <code>Partitioner</code> to use
890       * @throws IllegalStateException if the job is submitted
891       */
892      public void setPartitionerClass(Class<? extends Partitioner> cls
893                                      ) throws IllegalStateException {
894        ensureState(JobState.DEFINE);
895        conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
896                      Partitioner.class);
897      }
898    
899      /**
900       * Set the key class for the map output data. This allows the user to
901       * specify the map output key class to be different than the final output
902       * value class.
903       * 
904       * @param theClass the map output key class.
905       * @throws IllegalStateException if the job is submitted
906       */
907      public void setMapOutputKeyClass(Class<?> theClass
908                                       ) throws IllegalStateException {
909        ensureState(JobState.DEFINE);
910        conf.setMapOutputKeyClass(theClass);
911      }
912    
913      /**
914       * Set the value class for the map output data. This allows the user to
915       * specify the map output value class to be different than the final output
916       * value class.
917       * 
918       * @param theClass the map output value class.
919       * @throws IllegalStateException if the job is submitted
920       */
921      public void setMapOutputValueClass(Class<?> theClass
922                                         ) throws IllegalStateException {
923        ensureState(JobState.DEFINE);
924        conf.setMapOutputValueClass(theClass);
925      }
926    
927      /**
928       * Set the key class for the job output data.
929       * 
930       * @param theClass the key class for the job output data.
931       * @throws IllegalStateException if the job is submitted
932       */
933      public void setOutputKeyClass(Class<?> theClass
934                                    ) throws IllegalStateException {
935        ensureState(JobState.DEFINE);
936        conf.setOutputKeyClass(theClass);
937      }
938    
939      /**
940       * Set the value class for job outputs.
941       * 
942       * @param theClass the value class for job outputs.
943       * @throws IllegalStateException if the job is submitted
944       */
945      public void setOutputValueClass(Class<?> theClass
946                                      ) throws IllegalStateException {
947        ensureState(JobState.DEFINE);
948        conf.setOutputValueClass(theClass);
949      }
950    
951      /**
952       * Define the comparator that controls which keys are grouped together
953       * for a single call to combiner,
954       * {@link Reducer#reduce(Object, Iterable,
955       * org.apache.hadoop.mapreduce.Reducer.Context)}
956       *
957       * @param cls the raw comparator to use
958       * @throws IllegalStateException if the job is submitted
959       */
960      public void setCombinerKeyGroupingComparatorClass(
961          Class<? extends RawComparator> cls) throws IllegalStateException {
962        ensureState(JobState.DEFINE);
963        conf.setCombinerKeyGroupingComparator(cls);
964      }
965    
966      /**
967       * Define the comparator that controls how the keys are sorted before they
968       * are passed to the {@link Reducer}.
969       * @param cls the raw comparator
970       * @throws IllegalStateException if the job is submitted
971       * @see #setCombinerKeyGroupingComparatorClass(Class)
972       */
973      public void setSortComparatorClass(Class<? extends RawComparator> cls
974                                         ) throws IllegalStateException {
975        ensureState(JobState.DEFINE);
976        conf.setOutputKeyComparatorClass(cls);
977      }
978    
979      /**
980       * Define the comparator that controls which keys are grouped together
981       * for a single call to 
982       * {@link Reducer#reduce(Object, Iterable, 
983       *                       org.apache.hadoop.mapreduce.Reducer.Context)}
984       * @param cls the raw comparator to use
985       * @throws IllegalStateException if the job is submitted
986       * @see #setCombinerKeyGroupingComparatorClass(Class)
987       */
988      public void setGroupingComparatorClass(Class<? extends RawComparator> cls
989                                             ) throws IllegalStateException {
990        ensureState(JobState.DEFINE);
991        conf.setOutputValueGroupingComparator(cls);
992      }
993    
994      /**
995       * Set the user-specified job name.
996       * 
997       * @param name the job's new name.
998       * @throws IllegalStateException if the job is submitted
999       */
1000      public void setJobName(String name) throws IllegalStateException {
1001        ensureState(JobState.DEFINE);
1002        conf.setJobName(name);
1003      }
1004    
1005      /**
1006       * Turn speculative execution on or off for this job. 
1007       * 
1008       * @param speculativeExecution <code>true</code> if speculative execution 
1009       *                             should be turned on, else <code>false</code>.
1010       */
1011      public void setSpeculativeExecution(boolean speculativeExecution) {
1012        ensureState(JobState.DEFINE);
1013        conf.setSpeculativeExecution(speculativeExecution);
1014      }
1015    
1016      /**
1017       * Turn speculative execution on or off for this job for map tasks. 
1018       * 
1019       * @param speculativeExecution <code>true</code> if speculative execution 
1020       *                             should be turned on for map tasks,
1021       *                             else <code>false</code>.
1022       */
1023      public void setMapSpeculativeExecution(boolean speculativeExecution) {
1024        ensureState(JobState.DEFINE);
1025        conf.setMapSpeculativeExecution(speculativeExecution);
1026      }
1027    
1028      /**
1029       * Turn speculative execution on or off for this job for reduce tasks. 
1030       * 
1031       * @param speculativeExecution <code>true</code> if speculative execution 
1032       *                             should be turned on for reduce tasks,
1033       *                             else <code>false</code>.
1034       */
1035      public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1036        ensureState(JobState.DEFINE);
1037        conf.setReduceSpeculativeExecution(speculativeExecution);
1038      }
1039    
1040      /**
1041       * Specify whether job-setup and job-cleanup is needed for the job 
1042       * 
1043       * @param needed If <code>true</code>, job-setup and job-cleanup will be
1044       *               considered from {@link OutputCommitter} 
1045       *               else ignored.
1046       */
1047      public void setJobSetupCleanupNeeded(boolean needed) {
1048        ensureState(JobState.DEFINE);
1049        conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
1050      }
1051    
1052      /**
1053       * Set the given set of archives
1054       * @param archives The list of archives that need to be localized
1055       */
1056      public void setCacheArchives(URI[] archives) {
1057        ensureState(JobState.DEFINE);
1058        DistributedCache.setCacheArchives(archives, conf);
1059      }
1060    
1061      /**
1062       * Set the given set of files
1063       * @param files The list of files that need to be localized
1064       */
1065      public void setCacheFiles(URI[] files) {
1066        ensureState(JobState.DEFINE);
1067        DistributedCache.setCacheFiles(files, conf);
1068      }
1069    
1070      /**
1071       * Add a archives to be localized
1072       * @param uri The uri of the cache to be localized
1073       */
1074      public void addCacheArchive(URI uri) {
1075        ensureState(JobState.DEFINE);
1076        DistributedCache.addCacheArchive(uri, conf);
1077      }
1078      
1079      /**
1080       * Add a file to be localized
1081       * @param uri The uri of the cache to be localized
1082       */
1083      public void addCacheFile(URI uri) {
1084        ensureState(JobState.DEFINE);
1085        DistributedCache.addCacheFile(uri, conf);
1086      }
1087    
1088      /**
1089       * Add an file path to the current set of classpath entries It adds the file
1090       * to cache as well.
1091       * 
1092       * Files added with this method will not be unpacked while being added to the
1093       * classpath.
1094       * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1095       * method instead.
1096       *
1097       * @param file Path of the file to be added
1098       */
1099      public void addFileToClassPath(Path file)
1100        throws IOException {
1101        ensureState(JobState.DEFINE);
1102        DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1103      }
1104    
1105      /**
1106       * Add an archive path to the current set of classpath entries. It adds the
1107       * archive to cache as well.
1108       * 
1109       * Archive files will be unpacked and added to the classpath
1110       * when being distributed.
1111       *
1112       * @param archive Path of the archive to be added
1113       */
1114      public void addArchiveToClassPath(Path archive)
1115        throws IOException {
1116        ensureState(JobState.DEFINE);
1117        DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1118      }
1119    
1120      /**
1121       * Originally intended to enable symlinks, but currently symlinks cannot be
1122       * disabled.
1123       */
1124      @Deprecated
1125      public void createSymlink() {
1126        ensureState(JobState.DEFINE);
1127        DistributedCache.createSymlink(conf);
1128      }
1129      
1130      /** 
1131       * Expert: Set the number of maximum attempts that will be made to run a
1132       * map task.
1133       * 
1134       * @param n the number of attempts per map task.
1135       */
1136      public void setMaxMapAttempts(int n) {
1137        ensureState(JobState.DEFINE);
1138        conf.setMaxMapAttempts(n);
1139      }
1140    
1141      /** 
1142       * Expert: Set the number of maximum attempts that will be made to run a
1143       * reduce task.
1144       * 
1145       * @param n the number of attempts per reduce task.
1146       */
1147      public void setMaxReduceAttempts(int n) {
1148        ensureState(JobState.DEFINE);
1149        conf.setMaxReduceAttempts(n);
1150      }
1151    
1152      /**
1153       * Set whether the system should collect profiler information for some of 
1154       * the tasks in this job? The information is stored in the user log 
1155       * directory.
1156       * @param newValue true means it should be gathered
1157       */
1158      public void setProfileEnabled(boolean newValue) {
1159        ensureState(JobState.DEFINE);
1160        conf.setProfileEnabled(newValue);
1161      }
1162    
1163      /**
1164       * Set the profiler configuration arguments. If the string contains a '%s' it
1165       * will be replaced with the name of the profiling output file when the task
1166       * runs.
1167       *
1168       * This value is passed to the task child JVM on the command line.
1169       *
1170       * @param value the configuration string
1171       */
1172      public void setProfileParams(String value) {
1173        ensureState(JobState.DEFINE);
1174        conf.setProfileParams(value);
1175      }
1176    
1177      /**
1178       * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1179       * must also be called.
1180       * @param newValue a set of integer ranges of the map ids
1181       */
1182      public void setProfileTaskRange(boolean isMap, String newValue) {
1183        ensureState(JobState.DEFINE);
1184        conf.setProfileTaskRange(isMap, newValue);
1185      }
1186    
1187      private void ensureNotSet(String attr, String msg) throws IOException {
1188        if (conf.get(attr) != null) {
1189          throw new IOException(attr + " is incompatible with " + msg + " mode.");
1190        }    
1191      }
1192      
1193      /**
1194       * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1195       * tokens upon job completion. Defaults to true.
1196       */
1197      public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1198        ensureState(JobState.DEFINE);
1199        conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1200      }
1201    
1202      /**
1203       * Default to the new APIs unless they are explicitly set or the old mapper or
1204       * reduce attributes are used.
1205       * @throws IOException if the configuration is inconsistant
1206       */
1207      private void setUseNewAPI() throws IOException {
1208        int numReduces = conf.getNumReduceTasks();
1209        String oldMapperClass = "mapred.mapper.class";
1210        String oldReduceClass = "mapred.reducer.class";
1211        conf.setBooleanIfUnset("mapred.mapper.new-api",
1212                               conf.get(oldMapperClass) == null);
1213        if (conf.getUseNewMapper()) {
1214          String mode = "new map API";
1215          ensureNotSet("mapred.input.format.class", mode);
1216          ensureNotSet(oldMapperClass, mode);
1217          if (numReduces != 0) {
1218            ensureNotSet("mapred.partitioner.class", mode);
1219           } else {
1220            ensureNotSet("mapred.output.format.class", mode);
1221          }      
1222        } else {
1223          String mode = "map compatability";
1224          ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1225          ensureNotSet(MAP_CLASS_ATTR, mode);
1226          if (numReduces != 0) {
1227            ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1228           } else {
1229            ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1230          }
1231        }
1232        if (numReduces != 0) {
1233          conf.setBooleanIfUnset("mapred.reducer.new-api",
1234                                 conf.get(oldReduceClass) == null);
1235          if (conf.getUseNewReducer()) {
1236            String mode = "new reduce API";
1237            ensureNotSet("mapred.output.format.class", mode);
1238            ensureNotSet(oldReduceClass, mode);   
1239          } else {
1240            String mode = "reduce compatability";
1241            ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1242            ensureNotSet(REDUCE_CLASS_ATTR, mode);   
1243          }
1244        }   
1245      }
1246    
1247      private synchronized void connect()
1248              throws IOException, InterruptedException, ClassNotFoundException {
1249        if (cluster == null) {
1250          cluster = 
1251            ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1252                       public Cluster run()
1253                              throws IOException, InterruptedException, 
1254                                     ClassNotFoundException {
1255                         return new Cluster(getConfiguration());
1256                       }
1257                     });
1258        }
1259      }
1260    
1261      boolean isConnected() {
1262        return cluster != null;
1263      }
1264    
1265      /** Only for mocking via unit tests. */
1266      @Private
1267      public JobSubmitter getJobSubmitter(FileSystem fs, 
1268          ClientProtocol submitClient) throws IOException {
1269        return new JobSubmitter(fs, submitClient);
1270      }
1271      /**
1272       * Submit the job to the cluster and return immediately.
1273       * @throws IOException
1274       */
1275      public void submit() 
1276             throws IOException, InterruptedException, ClassNotFoundException {
1277        ensureState(JobState.DEFINE);
1278        setUseNewAPI();
1279        connect();
1280        final JobSubmitter submitter = 
1281            getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1282        status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1283          public JobStatus run() throws IOException, InterruptedException, 
1284          ClassNotFoundException {
1285            return submitter.submitJobInternal(Job.this, cluster);
1286          }
1287        });
1288        state = JobState.RUNNING;
1289        LOG.info("The url to track the job: " + getTrackingURL());
1290       }
1291      
1292      /**
1293       * Submit the job to the cluster and wait for it to finish.
1294       * @param verbose print the progress to the user
1295       * @return true if the job succeeded
1296       * @throws IOException thrown if the communication with the 
1297       *         <code>JobTracker</code> is lost
1298       */
1299      public boolean waitForCompletion(boolean verbose
1300                                       ) throws IOException, InterruptedException,
1301                                                ClassNotFoundException {
1302        if (state == JobState.DEFINE) {
1303          submit();
1304        }
1305        if (verbose) {
1306          monitorAndPrintJob();
1307        } else {
1308          // get the completion poll interval from the client.
1309          int completionPollIntervalMillis = 
1310            Job.getCompletionPollInterval(cluster.getConf());
1311          while (!isComplete()) {
1312            try {
1313              Thread.sleep(completionPollIntervalMillis);
1314            } catch (InterruptedException ie) {
1315            }
1316          }
1317        }
1318        return isSuccessful();
1319      }
1320      
1321      /**
1322       * Monitor a job and print status in real-time as progress is made and tasks 
1323       * fail.
1324       * @return true if the job succeeded
1325       * @throws IOException if communication to the JobTracker fails
1326       */
1327      public boolean monitorAndPrintJob() 
1328          throws IOException, InterruptedException {
1329        String lastReport = null;
1330        Job.TaskStatusFilter filter;
1331        Configuration clientConf = getConfiguration();
1332        filter = Job.getTaskOutputFilter(clientConf);
1333        JobID jobId = getJobID();
1334        LOG.info("Running job: " + jobId);
1335        int eventCounter = 0;
1336        boolean profiling = getProfileEnabled();
1337        IntegerRanges mapRanges = getProfileTaskRange(true);
1338        IntegerRanges reduceRanges = getProfileTaskRange(false);
1339        int progMonitorPollIntervalMillis = 
1340          Job.getProgressPollInterval(clientConf);
1341        /* make sure to report full progress after the job is done */
1342        boolean reportedAfterCompletion = false;
1343        boolean reportedUberMode = false;
1344        while (!isComplete() || !reportedAfterCompletion) {
1345          if (isComplete()) {
1346            reportedAfterCompletion = true;
1347          } else {
1348            Thread.sleep(progMonitorPollIntervalMillis);
1349          }
1350          if (status.getState() == JobStatus.State.PREP) {
1351            continue;
1352          }      
1353          if (!reportedUberMode) {
1354            reportedUberMode = true;
1355            LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1356          }      
1357          String report = 
1358            (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1359                " reduce " + 
1360                StringUtils.formatPercent(reduceProgress(), 0));
1361          if (!report.equals(lastReport)) {
1362            LOG.info(report);
1363            lastReport = report;
1364          }
1365    
1366          TaskCompletionEvent[] events = 
1367            getTaskCompletionEvents(eventCounter, 10); 
1368          eventCounter += events.length;
1369          printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1370        }
1371        boolean success = isSuccessful();
1372        if (success) {
1373          LOG.info("Job " + jobId + " completed successfully");
1374        } else {
1375          LOG.info("Job " + jobId + " failed with state " + status.getState() + 
1376              " due to: " + status.getFailureInfo());
1377        }
1378        Counters counters = getCounters();
1379        if (counters != null) {
1380          LOG.info(counters.toString());
1381        }
1382        return success;
1383      }
1384    
1385      /**
1386       * @return true if the profile parameters indicate that this is using
1387       * hprof, which generates profile files in a particular location
1388       * that we can retrieve to the client.
1389       */
1390      private boolean shouldDownloadProfile() {
1391        // Check the argument string that was used to initialize profiling.
1392        // If this indicates hprof and file-based output, then we're ok to
1393        // download.
1394        String profileParams = getProfileParams();
1395    
1396        if (null == profileParams) {
1397          return false;
1398        }
1399    
1400        // Split this on whitespace.
1401        String [] parts = profileParams.split("[ \\t]+");
1402    
1403        // If any of these indicate hprof, and the use of output files, return true.
1404        boolean hprofFound = false;
1405        boolean fileFound = false;
1406        for (String p : parts) {
1407          if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
1408            hprofFound = true;
1409    
1410            // This contains a number of comma-delimited components, one of which
1411            // may specify the file to write to. Make sure this is present and
1412            // not empty.
1413            String [] subparts = p.split(",");
1414            for (String sub : subparts) {
1415              if (sub.startsWith("file=") && sub.length() != "file=".length()) {
1416                fileFound = true;
1417              }
1418            }
1419          }
1420        }
1421    
1422        return hprofFound && fileFound;
1423      }
1424    
1425      private void printTaskEvents(TaskCompletionEvent[] events,
1426          Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1427          IntegerRanges reduceRanges) throws IOException, InterruptedException {
1428        for (TaskCompletionEvent event : events) {
1429          switch (filter) {
1430          case NONE:
1431            break;
1432          case SUCCEEDED:
1433            if (event.getStatus() == 
1434              TaskCompletionEvent.Status.SUCCEEDED) {
1435              LOG.info(event.toString());
1436            }
1437            break; 
1438          case FAILED:
1439            if (event.getStatus() == 
1440              TaskCompletionEvent.Status.FAILED) {
1441              LOG.info(event.toString());
1442              // Displaying the task diagnostic information
1443              TaskAttemptID taskId = event.getTaskAttemptId();
1444              String[] taskDiagnostics = getTaskDiagnostics(taskId); 
1445              if (taskDiagnostics != null) {
1446                for (String diagnostics : taskDiagnostics) {
1447                  System.err.println(diagnostics);
1448                }
1449              }
1450            }
1451            break; 
1452          case KILLED:
1453            if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1454              LOG.info(event.toString());
1455            }
1456            break; 
1457          case ALL:
1458            LOG.info(event.toString());
1459            break;
1460          }
1461        }
1462      }
1463    
1464      /** The interval at which monitorAndPrintJob() prints status */
1465      public static int getProgressPollInterval(Configuration conf) {
1466        // Read progress monitor poll interval from config. Default is 1 second.
1467        int progMonitorPollIntervalMillis = conf.getInt(
1468          PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1469        if (progMonitorPollIntervalMillis < 1) {
1470          LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
1471            " has been set to an invalid value; "
1472            + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1473          progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1474        }
1475        return progMonitorPollIntervalMillis;
1476      }
1477    
1478      /** The interval at which waitForCompletion() should check. */
1479      public static int getCompletionPollInterval(Configuration conf) {
1480        int completionPollIntervalMillis = conf.getInt(
1481          COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1482        if (completionPollIntervalMillis < 1) { 
1483          LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
1484           " has been set to an invalid value; "
1485           + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1486          completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1487        }
1488        return completionPollIntervalMillis;
1489      }
1490    
1491      /**
1492       * Get the task output filter.
1493       * 
1494       * @param conf the configuration.
1495       * @return the filter level.
1496       */
1497      public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1498        return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1499      }
1500    
1501      /**
1502       * Modify the Configuration to set the task output filter.
1503       * 
1504       * @param conf the Configuration to modify.
1505       * @param newValue the value to set.
1506       */
1507      public static void setTaskOutputFilter(Configuration conf, 
1508          TaskStatusFilter newValue) {
1509        conf.set(Job.OUTPUT_FILTER, newValue.toString());
1510      }
1511    
1512      public boolean isUber() throws IOException, InterruptedException {
1513        ensureState(JobState.RUNNING);
1514        updateStatus();
1515        return status.isUber();
1516      }
1517      
1518    }