001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.IOException;
022import java.net.URI;
023import java.security.PrivilegedExceptionAction;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.classification.InterfaceAudience.Private;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.conf.Configuration.IntegerRanges;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.RawComparator;
035import org.apache.hadoop.mapred.JobConf;
036import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038import org.apache.hadoop.mapreduce.task.JobContextImpl;
039import org.apache.hadoop.mapreduce.util.ConfigUtil;
040import org.apache.hadoop.util.StringUtils;
041
042/**
043 * The job submitter's view of the Job.
044 * 
045 * <p>It allows the user to configure the
046 * job, submit it, control its execution, and query the state. The set methods
047 * only work until the job is submitted, afterwards they will throw an 
048 * IllegalStateException. </p>
049 * 
050 * <p>
051 * Normally the user creates the application, describes various facets of the
052 * job via {@link Job} and then submits the job and monitor its progress.</p>
053 * 
054 * <p>Here is an example on how to submit a job:</p>
055 * <p><blockquote><pre>
056 *     // Create a new Job
057 *     Job job = new Job(new Configuration());
058 *     job.setJarByClass(MyJob.class);
059 *     
060 *     // Specify various job-specific parameters     
061 *     job.setJobName("myjob");
062 *     
063 *     job.setInputPath(new Path("in"));
064 *     job.setOutputPath(new Path("out"));
065 *     
066 *     job.setMapperClass(MyJob.MyMapper.class);
067 *     job.setReducerClass(MyJob.MyReducer.class);
068 *
069 *     // Submit the job, then poll for progress until the job is complete
070 *     job.waitForCompletion(true);
071 * </pre></blockquote></p>
072 * 
073 * 
074 */
075@InterfaceAudience.Public
076@InterfaceStability.Evolving
077public class Job extends JobContextImpl implements JobContext {  
078  private static final Log LOG = LogFactory.getLog(Job.class);
079
080  @InterfaceStability.Evolving
081  public static enum JobState {DEFINE, RUNNING};
082  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
083  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
084  /** Key in mapred-*.xml that sets completionPollInvervalMillis */
085  public static final String COMPLETION_POLL_INTERVAL_KEY = 
086    "mapreduce.client.completion.pollinterval";
087  
088  /** Default completionPollIntervalMillis is 5000 ms. */
089  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
090  /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
091  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
092    "mapreduce.client.progressmonitor.pollinterval";
093  /** Default progMonitorPollIntervalMillis is 1000 ms. */
094  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
095
096  public static final String USED_GENERIC_PARSER = 
097    "mapreduce.client.genericoptionsparser.used";
098  public static final String SUBMIT_REPLICATION = 
099    "mapreduce.client.submit.file.replication";
100  private static final String TASKLOG_PULL_TIMEOUT_KEY =
101           "mapreduce.client.tasklog.timeout";
102  private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
103
104  @InterfaceStability.Evolving
105  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
106
107  static {
108    ConfigUtil.loadResources();
109  }
110
111  private JobState state = JobState.DEFINE;
112  private JobStatus status;
113  private long statustime;
114  private Cluster cluster;
115
116  @Deprecated
117  public Job() throws IOException {
118    this(new Configuration());
119  }
120
121  @Deprecated
122  public Job(Configuration conf) throws IOException {
123    this(new JobConf(conf));
124  }
125
126  @Deprecated
127  public Job(Configuration conf, String jobName) throws IOException {
128    this(conf);
129    setJobName(jobName);
130  }
131
132  Job(JobConf conf) throws IOException {
133    super(conf, null);
134    // propagate existing user credentials to job
135    this.credentials.mergeAll(this.ugi.getCredentials());
136    this.cluster = null;
137  }
138
139  Job(JobStatus status, JobConf conf) throws IOException {
140    this(conf);
141    setJobID(status.getJobID());
142    this.status = status;
143    state = JobState.RUNNING;
144  }
145
146      
147  /**
148   * Creates a new {@link Job} with no particular {@link Cluster} .
149   * A Cluster will be created with a generic {@link Configuration}.
150   * 
151   * @return the {@link Job} , with no connection to a cluster yet.
152   * @throws IOException
153   */
154  public static Job getInstance() throws IOException {
155    // create with a null Cluster
156    return getInstance(new Configuration());
157  }
158      
159  /**
160   * Creates a new {@link Job} with no particular {@link Cluster} and a 
161   * given {@link Configuration}.
162   * 
163   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
164   * that any necessary internal modifications do not reflect on the incoming 
165   * parameter.
166   * 
167   * A Cluster will be created from the conf parameter only when it's needed.
168   * 
169   * @param conf the configuration
170   * @return the {@link Job} , with no connection to a cluster yet.
171   * @throws IOException
172   */
173  public static Job getInstance(Configuration conf) throws IOException {
174    // create with a null Cluster
175    JobConf jobConf = new JobConf(conf);
176    return new Job(jobConf);
177  }
178
179      
180  /**
181   * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
182   * A Cluster will be created from the conf parameter only when it's needed.
183   *
184   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
185   * that any necessary internal modifications do not reflect on the incoming 
186   * parameter.
187   * 
188   * @param conf the configuration
189   * @return the {@link Job} , with no connection to a cluster yet.
190   * @throws IOException
191   */
192  public static Job getInstance(Configuration conf, String jobName)
193           throws IOException {
194    // create with a null Cluster
195    Job result = getInstance(conf);
196    result.setJobName(jobName);
197    return result;
198  }
199  
200  /**
201   * Creates a new {@link Job} with no particular {@link Cluster} and given
202   * {@link Configuration} and {@link JobStatus}.
203   * A Cluster will be created from the conf parameter only when it's needed.
204   * 
205   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
206   * that any necessary internal modifications do not reflect on the incoming 
207   * parameter.
208   * 
209   * @param status job status
210   * @param conf job configuration
211   * @return the {@link Job} , with no connection to a cluster yet.
212   * @throws IOException
213   */
214  public static Job getInstance(JobStatus status, Configuration conf) 
215  throws IOException {
216    return new Job(status, new JobConf(conf));
217  }
218
219  /**
220   * Creates a new {@link Job} with no particular {@link Cluster}.
221   * A Cluster will be created from the conf parameter only when it's needed.
222   *
223   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
224   * that any necessary internal modifications do not reflect on the incoming 
225   * parameter.
226   * 
227   * @param ignored
228   * @return the {@link Job} , with no connection to a cluster yet.
229   * @throws IOException
230   * @deprecated Use {@link #getInstance()}
231   */
232  @Deprecated
233  public static Job getInstance(Cluster ignored) throws IOException {
234    return getInstance();
235  }
236  
237  /**
238   * Creates a new {@link Job} with no particular {@link Cluster} and given
239   * {@link Configuration}.
240   * A Cluster will be created from the conf parameter only when it's needed.
241   * 
242   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
243   * that any necessary internal modifications do not reflect on the incoming 
244   * parameter.
245   * 
246   * @param ignored
247   * @param conf job configuration
248   * @return the {@link Job} , with no connection to a cluster yet.
249   * @throws IOException
250   * @deprecated Use {@link #getInstance(Configuration)}
251   */
252  @Deprecated
253  public static Job getInstance(Cluster ignored, Configuration conf) 
254      throws IOException {
255    return getInstance(conf);
256  }
257  
258  /**
259   * Creates a new {@link Job} with no particular {@link Cluster} and given
260   * {@link Configuration} and {@link JobStatus}.
261   * A Cluster will be created from the conf parameter only when it's needed.
262   * 
263   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
264   * that any necessary internal modifications do not reflect on the incoming 
265   * parameter.
266   * 
267   * @param cluster cluster
268   * @param status job status
269   * @param conf job configuration
270   * @return the {@link Job} , with no connection to a cluster yet.
271   * @throws IOException
272   */
273  @Private
274  public static Job getInstance(Cluster cluster, JobStatus status, 
275      Configuration conf) throws IOException {
276    Job job = getInstance(status, conf);
277    job.setCluster(cluster);
278    return job;
279  }
280
281  private void ensureState(JobState state) throws IllegalStateException {
282    if (state != this.state) {
283      throw new IllegalStateException("Job in state "+ this.state + 
284                                      " instead of " + state);
285    }
286
287    if (state == JobState.RUNNING && cluster == null) {
288      throw new IllegalStateException
289        ("Job in state " + this.state
290         + ", but it isn't attached to any job tracker!");
291    }
292  }
293
294  /**
295   * Some methods rely on having a recent job status object.  Refresh
296   * it, if necessary
297   */
298  synchronized void ensureFreshStatus() 
299      throws IOException, InterruptedException {
300    if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
301      updateStatus();
302    }
303  }
304    
305  /** Some methods need to update status immediately. So, refresh
306   * immediately
307   * @throws IOException
308   */
309  synchronized void updateStatus() throws IOException, InterruptedException {
310    this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
311      @Override
312      public JobStatus run() throws IOException, InterruptedException {
313        return cluster.getClient().getJobStatus(status.getJobID());
314      }
315    });
316    if (this.status == null) {
317      throw new IOException("Job status not available ");
318    }
319    this.statustime = System.currentTimeMillis();
320  }
321  
322  public JobStatus getStatus() throws IOException, InterruptedException {
323    ensureState(JobState.RUNNING);
324    updateStatus();
325    return status;
326  }
327  
328  private void setStatus(JobStatus status) {
329    this.status = status;
330  }
331
332  /**
333   * Returns the current state of the Job.
334   * 
335   * @return JobStatus#State
336   * @throws IOException
337   * @throws InterruptedException
338   */
339  public JobStatus.State getJobState() 
340      throws IOException, InterruptedException {
341    ensureState(JobState.RUNNING);
342    updateStatus();
343    return status.getState();
344  }
345  
346  /**
347   * Get the URL where some job progress information will be displayed.
348   * 
349   * @return the URL where some job progress information will be displayed.
350   */
351  public String getTrackingURL(){
352    ensureState(JobState.RUNNING);
353    return status.getTrackingUrl().toString();
354  }
355
356  /**
357   * Get the path of the submitted job configuration.
358   * 
359   * @return the path of the submitted job configuration.
360   */
361  public String getJobFile() {
362    ensureState(JobState.RUNNING);
363    return status.getJobFile();
364  }
365
366  /**
367   * Get start time of the job.
368   * 
369   * @return the start time of the job
370   */
371  public long getStartTime() {
372    ensureState(JobState.RUNNING);
373    return status.getStartTime();
374  }
375
376  /**
377   * Get finish time of the job.
378   * 
379   * @return the finish time of the job
380   */
381  public long getFinishTime() throws IOException, InterruptedException {
382    ensureState(JobState.RUNNING);
383    updateStatus();
384    return status.getFinishTime();
385  }
386
387  /**
388   * Get scheduling info of the job.
389   * 
390   * @return the scheduling info of the job
391   */
392  public String getSchedulingInfo() {
393    ensureState(JobState.RUNNING);
394    return status.getSchedulingInfo();
395  }
396
397  /**
398   * Get scheduling info of the job.
399   * 
400   * @return the scheduling info of the job
401   */
402  public JobPriority getPriority() throws IOException, InterruptedException {
403    ensureState(JobState.RUNNING);
404    updateStatus();
405    return status.getPriority();
406  }
407
408  /**
409   * The user-specified job name.
410   */
411  public String getJobName() {
412    if (state == JobState.DEFINE) {
413      return super.getJobName();
414    }
415    ensureState(JobState.RUNNING);
416    return status.getJobName();
417  }
418
419  public String getHistoryUrl() throws IOException, InterruptedException {
420    ensureState(JobState.RUNNING);
421    updateStatus();
422    return status.getHistoryFile();
423  }
424
425  public boolean isRetired() throws IOException, InterruptedException {
426    ensureState(JobState.RUNNING);
427    updateStatus();
428    return status.isRetired();
429  }
430  
431  @Private
432  public Cluster getCluster() {
433    return cluster;
434  }
435
436  /** Only for mocks in unit tests. */
437  @Private
438  private void setCluster(Cluster cluster) {
439    this.cluster = cluster;
440  }
441
442  /**
443   * Dump stats to screen.
444   */
445  @Override
446  public String toString() {
447    ensureState(JobState.RUNNING);
448    String reasonforFailure = " ";
449    int numMaps = 0;
450    int numReduces = 0;
451    try {
452      updateStatus();
453      if (status.getState().equals(JobStatus.State.FAILED))
454        reasonforFailure = getTaskFailureEventString();
455      numMaps = getTaskReports(TaskType.MAP).length;
456      numReduces = getTaskReports(TaskType.REDUCE).length;
457    } catch (IOException e) {
458    } catch (InterruptedException ie) {
459    }
460    StringBuffer sb = new StringBuffer();
461    sb.append("Job: ").append(status.getJobID()).append("\n");
462    sb.append("Job File: ").append(status.getJobFile()).append("\n");
463    sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
464    sb.append("\n");
465    sb.append("Uber job : ").append(status.isUber()).append("\n");
466    sb.append("Number of maps: ").append(numMaps).append("\n");
467    sb.append("Number of reduces: ").append(numReduces).append("\n");
468    sb.append("map() completion: ");
469    sb.append(status.getMapProgress()).append("\n");
470    sb.append("reduce() completion: ");
471    sb.append(status.getReduceProgress()).append("\n");
472    sb.append("Job state: ");
473    sb.append(status.getState()).append("\n");
474    sb.append("retired: ").append(status.isRetired()).append("\n");
475    sb.append("reason for failure: ").append(reasonforFailure);
476    return sb.toString();
477  }
478
479  /**
480   * @return taskid which caused job failure
481   * @throws IOException
482   * @throws InterruptedException
483   */
484  String getTaskFailureEventString() throws IOException,
485      InterruptedException {
486    int failCount = 1;
487    TaskCompletionEvent lastEvent = null;
488    TaskCompletionEvent[] events = ugi.doAs(new 
489        PrivilegedExceptionAction<TaskCompletionEvent[]>() {
490          @Override
491          public TaskCompletionEvent[] run() throws IOException,
492          InterruptedException {
493            return cluster.getClient().getTaskCompletionEvents(
494                status.getJobID(), 0, 10);
495          }
496        });
497    for (TaskCompletionEvent event : events) {
498      if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
499        failCount++;
500        lastEvent = event;
501      }
502    }
503    if (lastEvent == null) {
504      return "There are no failed tasks for the job. "
505          + "Job is failed due to some other reason and reason "
506          + "can be found in the logs.";
507    }
508    String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
509    String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
510    return (" task " + taskID + " failed " +
511      failCount + " times " + "For details check tasktracker at: " +
512      lastEvent.getTaskTrackerHttp());
513  }
514
515  /**
516   * Get the information of the current state of the tasks of a job.
517   * 
518   * @param type Type of the task
519   * @return the list of all of the map tips.
520   * @throws IOException
521   */
522  public TaskReport[] getTaskReports(TaskType type) 
523      throws IOException, InterruptedException {
524    ensureState(JobState.RUNNING);
525    final TaskType tmpType = type;
526    return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
527      public TaskReport[] run() throws IOException, InterruptedException {
528        return cluster.getClient().getTaskReports(getJobID(), tmpType);
529      }
530    });
531  }
532
533  /**
534   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
535   * and 1.0.  When all map tasks have completed, the function returns 1.0.
536   * 
537   * @return the progress of the job's map-tasks.
538   * @throws IOException
539   */
540  public float mapProgress() throws IOException, InterruptedException {
541    ensureState(JobState.RUNNING);
542    ensureFreshStatus();
543    return status.getMapProgress();
544  }
545
546  /**
547   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
548   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
549   * 
550   * @return the progress of the job's reduce-tasks.
551   * @throws IOException
552   */
553  public float reduceProgress() throws IOException, InterruptedException {
554    ensureState(JobState.RUNNING);
555    ensureFreshStatus();
556    return status.getReduceProgress();
557  }
558
559  /**
560   * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
561   * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
562   * 
563   * @return the progress of the job's cleanup-tasks.
564   * @throws IOException
565   */
566  public float cleanupProgress() throws IOException, InterruptedException {
567    ensureState(JobState.RUNNING);
568    ensureFreshStatus();
569    return status.getCleanupProgress();
570  }
571
572  /**
573   * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
574   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
575   * 
576   * @return the progress of the job's setup-tasks.
577   * @throws IOException
578   */
579  public float setupProgress() throws IOException, InterruptedException {
580    ensureState(JobState.RUNNING);
581    ensureFreshStatus();
582    return status.getSetupProgress();
583  }
584
585  /**
586   * Check if the job is finished or not. 
587   * This is a non-blocking call.
588   * 
589   * @return <code>true</code> if the job is complete, else <code>false</code>.
590   * @throws IOException
591   */
592  public boolean isComplete() throws IOException, InterruptedException {
593    ensureState(JobState.RUNNING);
594    updateStatus();
595    return status.isJobComplete();
596  }
597
598  /**
599   * Check if the job completed successfully. 
600   * 
601   * @return <code>true</code> if the job succeeded, else <code>false</code>.
602   * @throws IOException
603   */
604  public boolean isSuccessful() throws IOException, InterruptedException {
605    ensureState(JobState.RUNNING);
606    updateStatus();
607    return status.getState() == JobStatus.State.SUCCEEDED;
608  }
609
610  /**
611   * Kill the running job.  Blocks until all job tasks have been
612   * killed as well.  If the job is no longer running, it simply returns.
613   * 
614   * @throws IOException
615   */
616  public void killJob() throws IOException, InterruptedException {
617    ensureState(JobState.RUNNING);
618    cluster.getClient().killJob(getJobID());
619  }
620
621  /**
622   * Set the priority of a running job.
623   * @param priority the new priority for the job.
624   * @throws IOException
625   */
626  public void setPriority(JobPriority priority) 
627      throws IOException, InterruptedException {
628    if (state == JobState.DEFINE) {
629      conf.setJobPriority(
630        org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
631    } else {
632      ensureState(JobState.RUNNING);
633      final JobPriority tmpPriority = priority;
634      ugi.doAs(new PrivilegedExceptionAction<Object>() {
635        @Override
636        public Object run() throws IOException, InterruptedException {
637          cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
638          return null;
639        }
640      });
641    }
642  }
643
644  /**
645   * Get events indicating completion (success/failure) of component tasks.
646   *  
647   * @param startFrom index to start fetching events from
648   * @param numEvents number of events to fetch
649   * @return an array of {@link TaskCompletionEvent}s
650   * @throws IOException
651   */
652  public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
653      final int numEvents) throws IOException, InterruptedException {
654    ensureState(JobState.RUNNING);
655    return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
656      @Override
657      public TaskCompletionEvent[] run() throws IOException, InterruptedException {
658        return cluster.getClient().getTaskCompletionEvents(getJobID(),
659            startFrom, numEvents); 
660      }
661    });
662    }
663  
664  /**
665   * Kill indicated task attempt.
666   * 
667   * @param taskId the id of the task to be terminated.
668   * @throws IOException
669   */
670  public boolean killTask(final TaskAttemptID taskId) 
671      throws IOException, InterruptedException {
672    ensureState(JobState.RUNNING);
673    return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
674      public Boolean run() throws IOException, InterruptedException {
675        return cluster.getClient().killTask(taskId, false);
676      }
677    });
678  }
679
680  /**
681   * Fail indicated task attempt.
682   * 
683   * @param taskId the id of the task to be terminated.
684   * @throws IOException
685   */
686  public boolean failTask(final TaskAttemptID taskId) 
687      throws IOException, InterruptedException {
688    ensureState(JobState.RUNNING);
689    return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
690      @Override
691      public Boolean run() throws IOException, InterruptedException {
692        return cluster.getClient().killTask(taskId, true);
693      }
694    });
695  }
696
697  /**
698   * Gets the counters for this job. May return null if the job has been
699   * retired and the job is no longer in the completed job store.
700   * 
701   * @return the counters for this job.
702   * @throws IOException
703   */
704  public Counters getCounters() 
705      throws IOException, InterruptedException {
706    ensureState(JobState.RUNNING);
707    return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
708      @Override
709      public Counters run() throws IOException, InterruptedException {
710        return cluster.getClient().getJobCounters(getJobID());
711      }
712    });
713  }
714
715  /**
716   * Gets the diagnostic messages for a given task attempt.
717   * @param taskid
718   * @return the list of diagnostic messages for the task
719   * @throws IOException
720   */
721  public String[] getTaskDiagnostics(final TaskAttemptID taskid) 
722      throws IOException, InterruptedException {
723    ensureState(JobState.RUNNING);
724    return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
725      @Override
726      public String[] run() throws IOException, InterruptedException {
727        return cluster.getClient().getTaskDiagnostics(taskid);
728      }
729    });
730  }
731
732  /**
733   * Set the number of reduce tasks for the job.
734   * @param tasks the number of reduce tasks
735   * @throws IllegalStateException if the job is submitted
736   */
737  public void setNumReduceTasks(int tasks) throws IllegalStateException {
738    ensureState(JobState.DEFINE);
739    conf.setNumReduceTasks(tasks);
740  }
741
742  /**
743   * Set the current working directory for the default file system.
744   * 
745   * @param dir the new current working directory.
746   * @throws IllegalStateException if the job is submitted
747   */
748  public void setWorkingDirectory(Path dir) throws IOException {
749    ensureState(JobState.DEFINE);
750    conf.setWorkingDirectory(dir);
751  }
752
753  /**
754   * Set the {@link InputFormat} for the job.
755   * @param cls the <code>InputFormat</code> to use
756   * @throws IllegalStateException if the job is submitted
757   */
758  public void setInputFormatClass(Class<? extends InputFormat> cls
759                                  ) throws IllegalStateException {
760    ensureState(JobState.DEFINE);
761    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
762                  InputFormat.class);
763  }
764
765  /**
766   * Set the {@link OutputFormat} for the job.
767   * @param cls the <code>OutputFormat</code> to use
768   * @throws IllegalStateException if the job is submitted
769   */
770  public void setOutputFormatClass(Class<? extends OutputFormat> cls
771                                   ) throws IllegalStateException {
772    ensureState(JobState.DEFINE);
773    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
774                  OutputFormat.class);
775  }
776
777  /**
778   * Set the {@link Mapper} for the job.
779   * @param cls the <code>Mapper</code> to use
780   * @throws IllegalStateException if the job is submitted
781   */
782  public void setMapperClass(Class<? extends Mapper> cls
783                             ) throws IllegalStateException {
784    ensureState(JobState.DEFINE);
785    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
786  }
787
788  /**
789   * Set the Jar by finding where a given class came from.
790   * @param cls the example class
791   */
792  public void setJarByClass(Class<?> cls) {
793    ensureState(JobState.DEFINE);
794    conf.setJarByClass(cls);
795  }
796
797  /**
798   * Set the job jar 
799   */
800  public void setJar(String jar) {
801    ensureState(JobState.DEFINE);
802    conf.setJar(jar);
803  }
804
805  /**
806   * Set the reported username for this job.
807   * 
808   * @param user the username for this job.
809   */
810  public void setUser(String user) {
811    ensureState(JobState.DEFINE);
812    conf.setUser(user);
813  }
814
815  /**
816   * Set the combiner class for the job.
817   * @param cls the combiner to use
818   * @throws IllegalStateException if the job is submitted
819   */
820  public void setCombinerClass(Class<? extends Reducer> cls
821                               ) throws IllegalStateException {
822    ensureState(JobState.DEFINE);
823    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
824  }
825
826  /**
827   * Set the {@link Reducer} for the job.
828   * @param cls the <code>Reducer</code> to use
829   * @throws IllegalStateException if the job is submitted
830   */
831  public void setReducerClass(Class<? extends Reducer> cls
832                              ) throws IllegalStateException {
833    ensureState(JobState.DEFINE);
834    conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
835  }
836
837  /**
838   * Set the {@link Partitioner} for the job.
839   * @param cls the <code>Partitioner</code> to use
840   * @throws IllegalStateException if the job is submitted
841   */
842  public void setPartitionerClass(Class<? extends Partitioner> cls
843                                  ) throws IllegalStateException {
844    ensureState(JobState.DEFINE);
845    conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
846                  Partitioner.class);
847  }
848
849  /**
850   * Set the key class for the map output data. This allows the user to
851   * specify the map output key class to be different than the final output
852   * value class.
853   * 
854   * @param theClass the map output key class.
855   * @throws IllegalStateException if the job is submitted
856   */
857  public void setMapOutputKeyClass(Class<?> theClass
858                                   ) throws IllegalStateException {
859    ensureState(JobState.DEFINE);
860    conf.setMapOutputKeyClass(theClass);
861  }
862
863  /**
864   * Set the value class for the map output data. This allows the user to
865   * specify the map output value class to be different than the final output
866   * value class.
867   * 
868   * @param theClass the map output value class.
869   * @throws IllegalStateException if the job is submitted
870   */
871  public void setMapOutputValueClass(Class<?> theClass
872                                     ) throws IllegalStateException {
873    ensureState(JobState.DEFINE);
874    conf.setMapOutputValueClass(theClass);
875  }
876
877  /**
878   * Set the key class for the job output data.
879   * 
880   * @param theClass the key class for the job output data.
881   * @throws IllegalStateException if the job is submitted
882   */
883  public void setOutputKeyClass(Class<?> theClass
884                                ) throws IllegalStateException {
885    ensureState(JobState.DEFINE);
886    conf.setOutputKeyClass(theClass);
887  }
888
889  /**
890   * Set the value class for job outputs.
891   * 
892   * @param theClass the value class for job outputs.
893   * @throws IllegalStateException if the job is submitted
894   */
895  public void setOutputValueClass(Class<?> theClass
896                                  ) throws IllegalStateException {
897    ensureState(JobState.DEFINE);
898    conf.setOutputValueClass(theClass);
899  }
900
901  /**
902   * Define the comparator that controls how the keys are sorted before they
903   * are passed to the {@link Reducer}.
904   * @param cls the raw comparator
905   * @throws IllegalStateException if the job is submitted
906   */
907  public void setSortComparatorClass(Class<? extends RawComparator> cls
908                                     ) throws IllegalStateException {
909    ensureState(JobState.DEFINE);
910    conf.setOutputKeyComparatorClass(cls);
911  }
912
913  /**
914   * Define the comparator that controls which keys are grouped together
915   * for a single call to 
916   * {@link Reducer#reduce(Object, Iterable, 
917   *                       org.apache.hadoop.mapreduce.Reducer.Context)}
918   * @param cls the raw comparator to use
919   * @throws IllegalStateException if the job is submitted
920   */
921  public void setGroupingComparatorClass(Class<? extends RawComparator> cls
922                                         ) throws IllegalStateException {
923    ensureState(JobState.DEFINE);
924    conf.setOutputValueGroupingComparator(cls);
925  }
926
927  /**
928   * Set the user-specified job name.
929   * 
930   * @param name the job's new name.
931   * @throws IllegalStateException if the job is submitted
932   */
933  public void setJobName(String name) throws IllegalStateException {
934    ensureState(JobState.DEFINE);
935    conf.setJobName(name);
936  }
937
938  /**
939   * Turn speculative execution on or off for this job. 
940   * 
941   * @param speculativeExecution <code>true</code> if speculative execution 
942   *                             should be turned on, else <code>false</code>.
943   */
944  public void setSpeculativeExecution(boolean speculativeExecution) {
945    ensureState(JobState.DEFINE);
946    conf.setSpeculativeExecution(speculativeExecution);
947  }
948
949  /**
950   * Turn speculative execution on or off for this job for map tasks. 
951   * 
952   * @param speculativeExecution <code>true</code> if speculative execution 
953   *                             should be turned on for map tasks,
954   *                             else <code>false</code>.
955   */
956  public void setMapSpeculativeExecution(boolean speculativeExecution) {
957    ensureState(JobState.DEFINE);
958    conf.setMapSpeculativeExecution(speculativeExecution);
959  }
960
961  /**
962   * Turn speculative execution on or off for this job for reduce tasks. 
963   * 
964   * @param speculativeExecution <code>true</code> if speculative execution 
965   *                             should be turned on for reduce tasks,
966   *                             else <code>false</code>.
967   */
968  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
969    ensureState(JobState.DEFINE);
970    conf.setReduceSpeculativeExecution(speculativeExecution);
971  }
972
973  /**
974   * Specify whether job-setup and job-cleanup is needed for the job 
975   * 
976   * @param needed If <code>true</code>, job-setup and job-cleanup will be
977   *               considered from {@link OutputCommitter} 
978   *               else ignored.
979   */
980  public void setJobSetupCleanupNeeded(boolean needed) {
981    ensureState(JobState.DEFINE);
982    conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
983  }
984
985  /**
986   * Set the given set of archives
987   * @param archives The list of archives that need to be localized
988   */
989  public void setCacheArchives(URI[] archives) {
990    ensureState(JobState.DEFINE);
991    DistributedCache.setCacheArchives(archives, conf);
992  }
993
994  /**
995   * Set the given set of files
996   * @param files The list of files that need to be localized
997   */
998  public void setCacheFiles(URI[] files) {
999    ensureState(JobState.DEFINE);
1000    DistributedCache.setCacheFiles(files, conf);
1001  }
1002
1003  /**
1004   * Add a archives to be localized
1005   * @param uri The uri of the cache to be localized
1006   */
1007  public void addCacheArchive(URI uri) {
1008    ensureState(JobState.DEFINE);
1009    DistributedCache.addCacheArchive(uri, conf);
1010  }
1011  
1012  /**
1013   * Add a file to be localized
1014   * @param uri The uri of the cache to be localized
1015   */
1016  public void addCacheFile(URI uri) {
1017    ensureState(JobState.DEFINE);
1018    DistributedCache.addCacheFile(uri, conf);
1019  }
1020
1021  /**
1022   * Add an file path to the current set of classpath entries It adds the file
1023   * to cache as well.
1024   * 
1025   * Files added with this method will not be unpacked while being added to the
1026   * classpath.
1027   * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1028   * method instead.
1029   *
1030   * @param file Path of the file to be added
1031   */
1032  public void addFileToClassPath(Path file)
1033    throws IOException {
1034    ensureState(JobState.DEFINE);
1035    DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1036  }
1037
1038  /**
1039   * Add an archive path to the current set of classpath entries. It adds the
1040   * archive to cache as well.
1041   * 
1042   * Archive files will be unpacked and added to the classpath
1043   * when being distributed.
1044   *
1045   * @param archive Path of the archive to be added
1046   */
1047  public void addArchiveToClassPath(Path archive)
1048    throws IOException {
1049    ensureState(JobState.DEFINE);
1050    DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1051  }
1052
1053  /**
1054   * Originally intended to enable symlinks, but currently symlinks cannot be
1055   * disabled.
1056   */
1057  @Deprecated
1058  public void createSymlink() {
1059    ensureState(JobState.DEFINE);
1060    DistributedCache.createSymlink(conf);
1061  }
1062  
1063  /** 
1064   * Expert: Set the number of maximum attempts that will be made to run a
1065   * map task.
1066   * 
1067   * @param n the number of attempts per map task.
1068   */
1069  public void setMaxMapAttempts(int n) {
1070    ensureState(JobState.DEFINE);
1071    conf.setMaxMapAttempts(n);
1072  }
1073
1074  /** 
1075   * Expert: Set the number of maximum attempts that will be made to run a
1076   * reduce task.
1077   * 
1078   * @param n the number of attempts per reduce task.
1079   */
1080  public void setMaxReduceAttempts(int n) {
1081    ensureState(JobState.DEFINE);
1082    conf.setMaxReduceAttempts(n);
1083  }
1084
1085  /**
1086   * Set whether the system should collect profiler information for some of 
1087   * the tasks in this job? The information is stored in the user log 
1088   * directory.
1089   * @param newValue true means it should be gathered
1090   */
1091  public void setProfileEnabled(boolean newValue) {
1092    ensureState(JobState.DEFINE);
1093    conf.setProfileEnabled(newValue);
1094  }
1095
1096  /**
1097   * Set the profiler configuration arguments. If the string contains a '%s' it
1098   * will be replaced with the name of the profiling output file when the task
1099   * runs.
1100   *
1101   * This value is passed to the task child JVM on the command line.
1102   *
1103   * @param value the configuration string
1104   */
1105  public void setProfileParams(String value) {
1106    ensureState(JobState.DEFINE);
1107    conf.setProfileParams(value);
1108  }
1109
1110  /**
1111   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1112   * must also be called.
1113   * @param newValue a set of integer ranges of the map ids
1114   */
1115  public void setProfileTaskRange(boolean isMap, String newValue) {
1116    ensureState(JobState.DEFINE);
1117    conf.setProfileTaskRange(isMap, newValue);
1118  }
1119
1120  private void ensureNotSet(String attr, String msg) throws IOException {
1121    if (conf.get(attr) != null) {
1122      throw new IOException(attr + " is incompatible with " + msg + " mode.");
1123    }    
1124  }
1125  
1126  /**
1127   * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1128   * tokens upon job completion. Defaults to true.
1129   */
1130  public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1131    ensureState(JobState.DEFINE);
1132    conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1133  }
1134
1135  /**
1136   * Default to the new APIs unless they are explicitly set or the old mapper or
1137   * reduce attributes are used.
1138   * @throws IOException if the configuration is inconsistant
1139   */
1140  private void setUseNewAPI() throws IOException {
1141    int numReduces = conf.getNumReduceTasks();
1142    String oldMapperClass = "mapred.mapper.class";
1143    String oldReduceClass = "mapred.reducer.class";
1144    conf.setBooleanIfUnset("mapred.mapper.new-api",
1145                           conf.get(oldMapperClass) == null);
1146    if (conf.getUseNewMapper()) {
1147      String mode = "new map API";
1148      ensureNotSet("mapred.input.format.class", mode);
1149      ensureNotSet(oldMapperClass, mode);
1150      if (numReduces != 0) {
1151        ensureNotSet("mapred.partitioner.class", mode);
1152       } else {
1153        ensureNotSet("mapred.output.format.class", mode);
1154      }      
1155    } else {
1156      String mode = "map compatability";
1157      ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1158      ensureNotSet(MAP_CLASS_ATTR, mode);
1159      if (numReduces != 0) {
1160        ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1161       } else {
1162        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1163      }
1164    }
1165    if (numReduces != 0) {
1166      conf.setBooleanIfUnset("mapred.reducer.new-api",
1167                             conf.get(oldReduceClass) == null);
1168      if (conf.getUseNewReducer()) {
1169        String mode = "new reduce API";
1170        ensureNotSet("mapred.output.format.class", mode);
1171        ensureNotSet(oldReduceClass, mode);   
1172      } else {
1173        String mode = "reduce compatability";
1174        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1175        ensureNotSet(REDUCE_CLASS_ATTR, mode);   
1176      }
1177    }   
1178  }
1179
1180  private synchronized void connect()
1181          throws IOException, InterruptedException, ClassNotFoundException {
1182    if (cluster == null) {
1183      cluster = 
1184        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1185                   public Cluster run()
1186                          throws IOException, InterruptedException, 
1187                                 ClassNotFoundException {
1188                     return new Cluster(getConfiguration());
1189                   }
1190                 });
1191    }
1192  }
1193
1194  boolean isConnected() {
1195    return cluster != null;
1196  }
1197
1198  /** Only for mocking via unit tests. */
1199  @Private
1200  public JobSubmitter getJobSubmitter(FileSystem fs, 
1201      ClientProtocol submitClient) throws IOException {
1202    return new JobSubmitter(fs, submitClient);
1203  }
1204  /**
1205   * Submit the job to the cluster and return immediately.
1206   * @throws IOException
1207   */
1208  public void submit() 
1209         throws IOException, InterruptedException, ClassNotFoundException {
1210    ensureState(JobState.DEFINE);
1211    setUseNewAPI();
1212    connect();
1213    final JobSubmitter submitter = 
1214        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1215    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1216      public JobStatus run() throws IOException, InterruptedException, 
1217      ClassNotFoundException {
1218        return submitter.submitJobInternal(Job.this, cluster);
1219      }
1220    });
1221    state = JobState.RUNNING;
1222    LOG.info("The url to track the job: " + getTrackingURL());
1223   }
1224  
1225  /**
1226   * Submit the job to the cluster and wait for it to finish.
1227   * @param verbose print the progress to the user
1228   * @return true if the job succeeded
1229   * @throws IOException thrown if the communication with the 
1230   *         <code>JobTracker</code> is lost
1231   */
1232  public boolean waitForCompletion(boolean verbose
1233                                   ) throws IOException, InterruptedException,
1234                                            ClassNotFoundException {
1235    if (state == JobState.DEFINE) {
1236      submit();
1237    }
1238    if (verbose) {
1239      monitorAndPrintJob();
1240    } else {
1241      // get the completion poll interval from the client.
1242      int completionPollIntervalMillis = 
1243        Job.getCompletionPollInterval(cluster.getConf());
1244      while (!isComplete()) {
1245        try {
1246          Thread.sleep(completionPollIntervalMillis);
1247        } catch (InterruptedException ie) {
1248        }
1249      }
1250    }
1251    return isSuccessful();
1252  }
1253  
1254  /**
1255   * Monitor a job and print status in real-time as progress is made and tasks 
1256   * fail.
1257   * @return true if the job succeeded
1258   * @throws IOException if communication to the JobTracker fails
1259   */
1260  public boolean monitorAndPrintJob() 
1261      throws IOException, InterruptedException {
1262    String lastReport = null;
1263    Job.TaskStatusFilter filter;
1264    Configuration clientConf = getConfiguration();
1265    filter = Job.getTaskOutputFilter(clientConf);
1266    JobID jobId = getJobID();
1267    LOG.info("Running job: " + jobId);
1268    int eventCounter = 0;
1269    boolean profiling = getProfileEnabled();
1270    IntegerRanges mapRanges = getProfileTaskRange(true);
1271    IntegerRanges reduceRanges = getProfileTaskRange(false);
1272    int progMonitorPollIntervalMillis = 
1273      Job.getProgressPollInterval(clientConf);
1274    /* make sure to report full progress after the job is done */
1275    boolean reportedAfterCompletion = false;
1276    boolean reportedUberMode = false;
1277    while (!isComplete() || !reportedAfterCompletion) {
1278      if (isComplete()) {
1279        reportedAfterCompletion = true;
1280      } else {
1281        Thread.sleep(progMonitorPollIntervalMillis);
1282      }
1283      if (status.getState() == JobStatus.State.PREP) {
1284        continue;
1285      }      
1286      if (!reportedUberMode) {
1287        reportedUberMode = true;
1288        LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1289      }      
1290      String report = 
1291        (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1292            " reduce " + 
1293            StringUtils.formatPercent(reduceProgress(), 0));
1294      if (!report.equals(lastReport)) {
1295        LOG.info(report);
1296        lastReport = report;
1297      }
1298
1299      TaskCompletionEvent[] events = 
1300        getTaskCompletionEvents(eventCounter, 10); 
1301      eventCounter += events.length;
1302      printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1303    }
1304    boolean success = isSuccessful();
1305    if (success) {
1306      LOG.info("Job " + jobId + " completed successfully");
1307    } else {
1308      LOG.info("Job " + jobId + " failed with state " + status.getState() + 
1309          " due to: " + status.getFailureInfo());
1310    }
1311    Counters counters = getCounters();
1312    if (counters != null) {
1313      LOG.info(counters.toString());
1314    }
1315    return success;
1316  }
1317
1318  /**
1319   * @return true if the profile parameters indicate that this is using
1320   * hprof, which generates profile files in a particular location
1321   * that we can retrieve to the client.
1322   */
1323  private boolean shouldDownloadProfile() {
1324    // Check the argument string that was used to initialize profiling.
1325    // If this indicates hprof and file-based output, then we're ok to
1326    // download.
1327    String profileParams = getProfileParams();
1328
1329    if (null == profileParams) {
1330      return false;
1331    }
1332
1333    // Split this on whitespace.
1334    String [] parts = profileParams.split("[ \\t]+");
1335
1336    // If any of these indicate hprof, and the use of output files, return true.
1337    boolean hprofFound = false;
1338    boolean fileFound = false;
1339    for (String p : parts) {
1340      if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
1341        hprofFound = true;
1342
1343        // This contains a number of comma-delimited components, one of which
1344        // may specify the file to write to. Make sure this is present and
1345        // not empty.
1346        String [] subparts = p.split(",");
1347        for (String sub : subparts) {
1348          if (sub.startsWith("file=") && sub.length() != "file=".length()) {
1349            fileFound = true;
1350          }
1351        }
1352      }
1353    }
1354
1355    return hprofFound && fileFound;
1356  }
1357
1358  private void printTaskEvents(TaskCompletionEvent[] events,
1359      Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1360      IntegerRanges reduceRanges) throws IOException, InterruptedException {
1361    for (TaskCompletionEvent event : events) {
1362      switch (filter) {
1363      case NONE:
1364        break;
1365      case SUCCEEDED:
1366        if (event.getStatus() == 
1367          TaskCompletionEvent.Status.SUCCEEDED) {
1368          LOG.info(event.toString());
1369        }
1370        break; 
1371      case FAILED:
1372        if (event.getStatus() == 
1373          TaskCompletionEvent.Status.FAILED) {
1374          LOG.info(event.toString());
1375          // Displaying the task diagnostic information
1376          TaskAttemptID taskId = event.getTaskAttemptId();
1377          String[] taskDiagnostics = getTaskDiagnostics(taskId); 
1378          if (taskDiagnostics != null) {
1379            for (String diagnostics : taskDiagnostics) {
1380              System.err.println(diagnostics);
1381            }
1382          }
1383        }
1384        break; 
1385      case KILLED:
1386        if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1387          LOG.info(event.toString());
1388        }
1389        break; 
1390      case ALL:
1391        LOG.info(event.toString());
1392        break;
1393      }
1394    }
1395  }
1396
1397  /** The interval at which monitorAndPrintJob() prints status */
1398  public static int getProgressPollInterval(Configuration conf) {
1399    // Read progress monitor poll interval from config. Default is 1 second.
1400    int progMonitorPollIntervalMillis = conf.getInt(
1401      PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1402    if (progMonitorPollIntervalMillis < 1) {
1403      LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
1404        " has been set to an invalid value; "
1405        + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1406      progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1407    }
1408    return progMonitorPollIntervalMillis;
1409  }
1410
1411  /** The interval at which waitForCompletion() should check. */
1412  public static int getCompletionPollInterval(Configuration conf) {
1413    int completionPollIntervalMillis = conf.getInt(
1414      COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1415    if (completionPollIntervalMillis < 1) { 
1416      LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
1417       " has been set to an invalid value; "
1418       + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1419      completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1420    }
1421    return completionPollIntervalMillis;
1422  }
1423
1424  /**
1425   * Get the task output filter.
1426   * 
1427   * @param conf the configuration.
1428   * @return the filter level.
1429   */
1430  public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1431    return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1432  }
1433
1434  /**
1435   * Modify the Configuration to set the task output filter.
1436   * 
1437   * @param conf the Configuration to modify.
1438   * @param newValue the value to set.
1439   */
1440  public static void setTaskOutputFilter(Configuration conf, 
1441      TaskStatusFilter newValue) {
1442    conf.set(Job.OUTPUT_FILTER, newValue.toString());
1443  }
1444
1445  public boolean isUber() throws IOException, InterruptedException {
1446    ensureState(JobState.RUNNING);
1447    updateStatus();
1448    return status.isUber();
1449  }
1450  
1451}