001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.IOException;
022import java.net.URI;
023import java.security.PrivilegedExceptionAction;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.classification.InterfaceAudience.Private;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.conf.Configuration.IntegerRanges;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.RawComparator;
035import org.apache.hadoop.mapred.JobConf;
036import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038import org.apache.hadoop.mapreduce.task.JobContextImpl;
039import org.apache.hadoop.mapreduce.util.ConfigUtil;
040import org.apache.hadoop.util.StringUtils;
041import org.apache.hadoop.yarn.api.records.ReservationId;
042
043/**
044 * The job submitter's view of the Job.
045 * 
046 * <p>It allows the user to configure the
047 * job, submit it, control its execution, and query the state. The set methods
048 * only work until the job is submitted, afterwards they will throw an 
049 * IllegalStateException. </p>
050 * 
051 * <p>
052 * Normally the user creates the application, describes various facets of the
053 * job via {@link Job} and then submits the job and monitor its progress.</p>
054 * 
055 * <p>Here is an example on how to submit a job:</p>
056 * <p><blockquote><pre>
057 *     // Create a new Job
058 *     Job job = Job.getInstance();
059 *     job.setJarByClass(MyJob.class);
060 *     
061 *     // Specify various job-specific parameters     
062 *     job.setJobName("myjob");
063 *     
064 *     job.setInputPath(new Path("in"));
065 *     job.setOutputPath(new Path("out"));
066 *     
067 *     job.setMapperClass(MyJob.MyMapper.class);
068 *     job.setReducerClass(MyJob.MyReducer.class);
069 *
070 *     // Submit the job, then poll for progress until the job is complete
071 *     job.waitForCompletion(true);
072 * </pre></blockquote>
073 * 
074 * 
075 */
076@InterfaceAudience.Public
077@InterfaceStability.Evolving
078public class Job extends JobContextImpl implements JobContext {  
079  private static final Log LOG = LogFactory.getLog(Job.class);
080
081  @InterfaceStability.Evolving
082  public static enum JobState {DEFINE, RUNNING};
083  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
084  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
085  /** Key in mapred-*.xml that sets completionPollInvervalMillis */
086  public static final String COMPLETION_POLL_INTERVAL_KEY = 
087    "mapreduce.client.completion.pollinterval";
088  
089  /** Default completionPollIntervalMillis is 5000 ms. */
090  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
091  /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
092  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
093    "mapreduce.client.progressmonitor.pollinterval";
094  /** Default progMonitorPollIntervalMillis is 1000 ms. */
095  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
096
097  public static final String USED_GENERIC_PARSER = 
098    "mapreduce.client.genericoptionsparser.used";
099  public static final String SUBMIT_REPLICATION = 
100    "mapreduce.client.submit.file.replication";
101  public static final int DEFAULT_SUBMIT_REPLICATION = 10;
102
103  @InterfaceStability.Evolving
104  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
105
106  static {
107    ConfigUtil.loadResources();
108  }
109
110  private JobState state = JobState.DEFINE;
111  private JobStatus status;
112  private long statustime;
113  private Cluster cluster;
114  private ReservationId reservationId;
115
116  /**
117   * @deprecated Use {@link #getInstance()}
118   */
119  @Deprecated
120  public Job() throws IOException {
121    this(new JobConf(new Configuration()));
122  }
123
124  /**
125   * @deprecated Use {@link #getInstance(Configuration)}
126   */
127  @Deprecated
128  public Job(Configuration conf) throws IOException {
129    this(new JobConf(conf));
130  }
131
132  /**
133   * @deprecated Use {@link #getInstance(Configuration, String)}
134   */
135  @Deprecated
136  public Job(Configuration conf, String jobName) throws IOException {
137    this(new JobConf(conf));
138    setJobName(jobName);
139  }
140
141  Job(JobConf conf) throws IOException {
142    super(conf, null);
143    // propagate existing user credentials to job
144    this.credentials.mergeAll(this.ugi.getCredentials());
145    this.cluster = null;
146  }
147
148  Job(JobStatus status, JobConf conf) throws IOException {
149    this(conf);
150    setJobID(status.getJobID());
151    this.status = status;
152    state = JobState.RUNNING;
153  }
154
155      
156  /**
157   * Creates a new {@link Job} with no particular {@link Cluster} .
158   * A Cluster will be created with a generic {@link Configuration}.
159   * 
160   * @return the {@link Job} , with no connection to a cluster yet.
161   * @throws IOException
162   */
163  public static Job getInstance() throws IOException {
164    // create with a null Cluster
165    return getInstance(new Configuration());
166  }
167      
168  /**
169   * Creates a new {@link Job} with no particular {@link Cluster} and a 
170   * given {@link Configuration}.
171   * 
172   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
173   * that any necessary internal modifications do not reflect on the incoming 
174   * parameter.
175   * 
176   * A Cluster will be created from the conf parameter only when it's needed.
177   * 
178   * @param conf the configuration
179   * @return the {@link Job} , with no connection to a cluster yet.
180   * @throws IOException
181   */
182  public static Job getInstance(Configuration conf) throws IOException {
183    // create with a null Cluster
184    JobConf jobConf = new JobConf(conf);
185    return new Job(jobConf);
186  }
187
188      
189  /**
190   * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
191   * A Cluster will be created from the conf parameter only when it's needed.
192   *
193   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
194   * that any necessary internal modifications do not reflect on the incoming 
195   * parameter.
196   * 
197   * @param conf the configuration
198   * @return the {@link Job} , with no connection to a cluster yet.
199   * @throws IOException
200   */
201  public static Job getInstance(Configuration conf, String jobName)
202           throws IOException {
203    // create with a null Cluster
204    Job result = getInstance(conf);
205    result.setJobName(jobName);
206    return result;
207  }
208  
209  /**
210   * Creates a new {@link Job} with no particular {@link Cluster} and given
211   * {@link Configuration} and {@link JobStatus}.
212   * A Cluster will be created from the conf parameter only when it's needed.
213   * 
214   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
215   * that any necessary internal modifications do not reflect on the incoming 
216   * parameter.
217   * 
218   * @param status job status
219   * @param conf job configuration
220   * @return the {@link Job} , with no connection to a cluster yet.
221   * @throws IOException
222   */
223  public static Job getInstance(JobStatus status, Configuration conf) 
224  throws IOException {
225    return new Job(status, new JobConf(conf));
226  }
227
228  /**
229   * Creates a new {@link Job} with no particular {@link Cluster}.
230   * A Cluster will be created from the conf parameter only when it's needed.
231   *
232   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
233   * that any necessary internal modifications do not reflect on the incoming 
234   * parameter.
235   * 
236   * @param ignored
237   * @return the {@link Job} , with no connection to a cluster yet.
238   * @throws IOException
239   * @deprecated Use {@link #getInstance()}
240   */
241  @Deprecated
242  public static Job getInstance(Cluster ignored) throws IOException {
243    return getInstance();
244  }
245  
246  /**
247   * Creates a new {@link Job} with no particular {@link Cluster} and given
248   * {@link Configuration}.
249   * A Cluster will be created from the conf parameter only when it's needed.
250   * 
251   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
252   * that any necessary internal modifications do not reflect on the incoming 
253   * parameter.
254   * 
255   * @param ignored
256   * @param conf job configuration
257   * @return the {@link Job} , with no connection to a cluster yet.
258   * @throws IOException
259   * @deprecated Use {@link #getInstance(Configuration)}
260   */
261  @Deprecated
262  public static Job getInstance(Cluster ignored, Configuration conf) 
263      throws IOException {
264    return getInstance(conf);
265  }
266  
267  /**
268   * Creates a new {@link Job} with no particular {@link Cluster} and given
269   * {@link Configuration} and {@link JobStatus}.
270   * A Cluster will be created from the conf parameter only when it's needed.
271   * 
272   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
273   * that any necessary internal modifications do not reflect on the incoming 
274   * parameter.
275   * 
276   * @param cluster cluster
277   * @param status job status
278   * @param conf job configuration
279   * @return the {@link Job} , with no connection to a cluster yet.
280   * @throws IOException
281   */
282  @Private
283  public static Job getInstance(Cluster cluster, JobStatus status, 
284      Configuration conf) throws IOException {
285    Job job = getInstance(status, conf);
286    job.setCluster(cluster);
287    return job;
288  }
289
290  private void ensureState(JobState state) throws IllegalStateException {
291    if (state != this.state) {
292      throw new IllegalStateException("Job in state "+ this.state + 
293                                      " instead of " + state);
294    }
295
296    if (state == JobState.RUNNING && cluster == null) {
297      throw new IllegalStateException
298        ("Job in state " + this.state
299         + ", but it isn't attached to any job tracker!");
300    }
301  }
302
303  /**
304   * Some methods rely on having a recent job status object.  Refresh
305   * it, if necessary
306   */
307  synchronized void ensureFreshStatus() 
308      throws IOException {
309    if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
310      updateStatus();
311    }
312  }
313    
314  /** Some methods need to update status immediately. So, refresh
315   * immediately
316   * @throws IOException
317   */
318  synchronized void updateStatus() throws IOException {
319    try {
320      this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
321        @Override
322        public JobStatus run() throws IOException, InterruptedException {
323          return cluster.getClient().getJobStatus(status.getJobID());
324        }
325      });
326    }
327    catch (InterruptedException ie) {
328      throw new IOException(ie);
329    }
330    if (this.status == null) {
331      throw new IOException("Job status not available ");
332    }
333    this.statustime = System.currentTimeMillis();
334  }
335  
336  public JobStatus getStatus() throws IOException, InterruptedException {
337    ensureState(JobState.RUNNING);
338    updateStatus();
339    return status;
340  }
341
342  /**
343   * Returns the current state of the Job.
344   * 
345   * @return JobStatus#State
346   * @throws IOException
347   * @throws InterruptedException
348   */
349  public JobStatus.State getJobState() 
350      throws IOException, InterruptedException {
351    ensureState(JobState.RUNNING);
352    updateStatus();
353    return status.getState();
354  }
355  
356  /**
357   * Get the URL where some job progress information will be displayed.
358   * 
359   * @return the URL where some job progress information will be displayed.
360   */
361  public String getTrackingURL(){
362    ensureState(JobState.RUNNING);
363    return status.getTrackingUrl().toString();
364  }
365
366  /**
367   * Get the path of the submitted job configuration.
368   * 
369   * @return the path of the submitted job configuration.
370   */
371  public String getJobFile() {
372    ensureState(JobState.RUNNING);
373    return status.getJobFile();
374  }
375
376  /**
377   * Get start time of the job.
378   * 
379   * @return the start time of the job
380   */
381  public long getStartTime() {
382    ensureState(JobState.RUNNING);
383    return status.getStartTime();
384  }
385
386  /**
387   * Get finish time of the job.
388   * 
389   * @return the finish time of the job
390   */
391  public long getFinishTime() throws IOException, InterruptedException {
392    ensureState(JobState.RUNNING);
393    updateStatus();
394    return status.getFinishTime();
395  }
396
397  /**
398   * Get scheduling info of the job.
399   * 
400   * @return the scheduling info of the job
401   */
402  public String getSchedulingInfo() {
403    ensureState(JobState.RUNNING);
404    return status.getSchedulingInfo();
405  }
406
407  /**
408   * Get scheduling info of the job.
409   * 
410   * @return the scheduling info of the job
411   */
412  public JobPriority getPriority() throws IOException, InterruptedException {
413    ensureState(JobState.RUNNING);
414    updateStatus();
415    return status.getPriority();
416  }
417
418  /**
419   * The user-specified job name.
420   */
421  public String getJobName() {
422    if (state == JobState.DEFINE) {
423      return super.getJobName();
424    }
425    ensureState(JobState.RUNNING);
426    return status.getJobName();
427  }
428
429  public String getHistoryUrl() throws IOException, InterruptedException {
430    ensureState(JobState.RUNNING);
431    updateStatus();
432    return status.getHistoryFile();
433  }
434
435  public boolean isRetired() throws IOException, InterruptedException {
436    ensureState(JobState.RUNNING);
437    updateStatus();
438    return status.isRetired();
439  }
440  
441  @Private
442  public Cluster getCluster() {
443    return cluster;
444  }
445
446  /** Only for mocks in unit tests. */
447  @Private
448  private void setCluster(Cluster cluster) {
449    this.cluster = cluster;
450  }
451
452  /**
453   * Dump stats to screen.
454   */
455  @Override
456  public String toString() {
457    ensureState(JobState.RUNNING);
458    String reasonforFailure = " ";
459    int numMaps = 0;
460    int numReduces = 0;
461    try {
462      updateStatus();
463      if (status.getState().equals(JobStatus.State.FAILED))
464        reasonforFailure = getTaskFailureEventString();
465      numMaps = getTaskReports(TaskType.MAP).length;
466      numReduces = getTaskReports(TaskType.REDUCE).length;
467    } catch (IOException e) {
468    } catch (InterruptedException ie) {
469    }
470    StringBuffer sb = new StringBuffer();
471    sb.append("Job: ").append(status.getJobID()).append("\n");
472    sb.append("Job File: ").append(status.getJobFile()).append("\n");
473    sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
474    sb.append("\n");
475    sb.append("Uber job : ").append(status.isUber()).append("\n");
476    sb.append("Number of maps: ").append(numMaps).append("\n");
477    sb.append("Number of reduces: ").append(numReduces).append("\n");
478    sb.append("map() completion: ");
479    sb.append(status.getMapProgress()).append("\n");
480    sb.append("reduce() completion: ");
481    sb.append(status.getReduceProgress()).append("\n");
482    sb.append("Job state: ");
483    sb.append(status.getState()).append("\n");
484    sb.append("retired: ").append(status.isRetired()).append("\n");
485    sb.append("reason for failure: ").append(reasonforFailure);
486    return sb.toString();
487  }
488
489  /**
490   * @return taskid which caused job failure
491   * @throws IOException
492   * @throws InterruptedException
493   */
494  String getTaskFailureEventString() throws IOException,
495      InterruptedException {
496    int failCount = 1;
497    TaskCompletionEvent lastEvent = null;
498    TaskCompletionEvent[] events = ugi.doAs(new 
499        PrivilegedExceptionAction<TaskCompletionEvent[]>() {
500          @Override
501          public TaskCompletionEvent[] run() throws IOException,
502          InterruptedException {
503            return cluster.getClient().getTaskCompletionEvents(
504                status.getJobID(), 0, 10);
505          }
506        });
507    for (TaskCompletionEvent event : events) {
508      if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
509        failCount++;
510        lastEvent = event;
511      }
512    }
513    if (lastEvent == null) {
514      return "There are no failed tasks for the job. "
515          + "Job is failed due to some other reason and reason "
516          + "can be found in the logs.";
517    }
518    String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
519    String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
520    return (" task " + taskID + " failed " +
521      failCount + " times " + "For details check tasktracker at: " +
522      lastEvent.getTaskTrackerHttp());
523  }
524
525  /**
526   * Get the information of the current state of the tasks of a job.
527   * 
528   * @param type Type of the task
529   * @return the list of all of the map tips.
530   * @throws IOException
531   */
532  public TaskReport[] getTaskReports(TaskType type) 
533      throws IOException, InterruptedException {
534    ensureState(JobState.RUNNING);
535    final TaskType tmpType = type;
536    return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
537      public TaskReport[] run() throws IOException, InterruptedException {
538        return cluster.getClient().getTaskReports(getJobID(), tmpType);
539      }
540    });
541  }
542
543  /**
544   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
545   * and 1.0.  When all map tasks have completed, the function returns 1.0.
546   * 
547   * @return the progress of the job's map-tasks.
548   * @throws IOException
549   */
550  public float mapProgress() throws IOException {
551    ensureState(JobState.RUNNING);
552    ensureFreshStatus();
553    return status.getMapProgress();
554  }
555
556  /**
557   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
558   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
559   * 
560   * @return the progress of the job's reduce-tasks.
561   * @throws IOException
562   */
563  public float reduceProgress() throws IOException {
564    ensureState(JobState.RUNNING);
565    ensureFreshStatus();
566    return status.getReduceProgress();
567  }
568
569  /**
570   * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
571   * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
572   * 
573   * @return the progress of the job's cleanup-tasks.
574   * @throws IOException
575   */
576  public float cleanupProgress() throws IOException, InterruptedException {
577    ensureState(JobState.RUNNING);
578    ensureFreshStatus();
579    return status.getCleanupProgress();
580  }
581
582  /**
583   * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
584   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
585   * 
586   * @return the progress of the job's setup-tasks.
587   * @throws IOException
588   */
589  public float setupProgress() throws IOException {
590    ensureState(JobState.RUNNING);
591    ensureFreshStatus();
592    return status.getSetupProgress();
593  }
594
595  /**
596   * Check if the job is finished or not. 
597   * This is a non-blocking call.
598   * 
599   * @return <code>true</code> if the job is complete, else <code>false</code>.
600   * @throws IOException
601   */
602  public boolean isComplete() throws IOException {
603    ensureState(JobState.RUNNING);
604    updateStatus();
605    return status.isJobComplete();
606  }
607
608  /**
609   * Check if the job completed successfully. 
610   * 
611   * @return <code>true</code> if the job succeeded, else <code>false</code>.
612   * @throws IOException
613   */
614  public boolean isSuccessful() throws IOException {
615    ensureState(JobState.RUNNING);
616    updateStatus();
617    return status.getState() == JobStatus.State.SUCCEEDED;
618  }
619
620  /**
621   * Kill the running job.  Blocks until all job tasks have been
622   * killed as well.  If the job is no longer running, it simply returns.
623   * 
624   * @throws IOException
625   */
626  public void killJob() throws IOException {
627    ensureState(JobState.RUNNING);
628    try {
629      cluster.getClient().killJob(getJobID());
630    }
631    catch (InterruptedException ie) {
632      throw new IOException(ie);
633    }
634  }
635
636  /**
637   * Set the priority of a running job.
638   * @param priority the new priority for the job.
639   * @throws IOException
640   */
641  public void setPriority(JobPriority priority) 
642      throws IOException, InterruptedException {
643    if (state == JobState.DEFINE) {
644      conf.setJobPriority(
645        org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
646    } else {
647      ensureState(JobState.RUNNING);
648      final JobPriority tmpPriority = priority;
649      ugi.doAs(new PrivilegedExceptionAction<Object>() {
650        @Override
651        public Object run() throws IOException, InterruptedException {
652          cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
653          return null;
654        }
655      });
656    }
657  }
658
659  /**
660   * Get events indicating completion (success/failure) of component tasks.
661   *  
662   * @param startFrom index to start fetching events from
663   * @param numEvents number of events to fetch
664   * @return an array of {@link TaskCompletionEvent}s
665   * @throws IOException
666   */
667  public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
668      final int numEvents) throws IOException, InterruptedException {
669    ensureState(JobState.RUNNING);
670    return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
671      @Override
672      public TaskCompletionEvent[] run() throws IOException, InterruptedException {
673        return cluster.getClient().getTaskCompletionEvents(getJobID(),
674            startFrom, numEvents); 
675      }
676    });
677  }
678
679  /**
680   * Get events indicating completion (success/failure) of component tasks.
681   *  
682   * @param startFrom index to start fetching events from
683   * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s
684   * @throws IOException
685   */
686  public org.apache.hadoop.mapred.TaskCompletionEvent[]
687    getTaskCompletionEvents(final int startFrom) throws IOException {
688    try {
689      TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10);
690      org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents =
691          new org.apache.hadoop.mapred.TaskCompletionEvent[events.length];
692      for (int i = 0; i < events.length; i++) {
693        retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade
694            (events[i]);
695      }
696      return retEvents;
697    } catch (InterruptedException ie) {
698      throw new IOException(ie);
699    }
700  }
701
702  /**
703   * Kill indicated task attempt.
704   * @param taskId the id of the task to kill.
705   * @param shouldFail if <code>true</code> the task is failed and added
706   *                   to failed tasks list, otherwise it is just killed,
707   *                   w/o affecting job failure status.
708   */
709  @Private
710  public boolean killTask(final TaskAttemptID taskId,
711                          final boolean shouldFail) throws IOException {
712    ensureState(JobState.RUNNING);
713    try {
714      return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
715        public Boolean run() throws IOException, InterruptedException {
716          return cluster.getClient().killTask(taskId, shouldFail);
717        }
718      });
719    }
720    catch (InterruptedException ie) {
721      throw new IOException(ie);
722    }
723  }
724
725  /**
726   * Kill indicated task attempt.
727   * 
728   * @param taskId the id of the task to be terminated.
729   * @throws IOException
730   */
731  public void killTask(final TaskAttemptID taskId)
732      throws IOException {
733    killTask(taskId, false);
734  }
735
736  /**
737   * Fail indicated task attempt.
738   * 
739   * @param taskId the id of the task to be terminated.
740   * @throws IOException
741   */
742  public void failTask(final TaskAttemptID taskId)
743      throws IOException {
744    killTask(taskId, true);
745  }
746
747  /**
748   * Gets the counters for this job. May return null if the job has been
749   * retired and the job is no longer in the completed job store.
750   * 
751   * @return the counters for this job.
752   * @throws IOException
753   */
754  public Counters getCounters() 
755      throws IOException {
756    ensureState(JobState.RUNNING);
757    try {
758      return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
759        @Override
760        public Counters run() throws IOException, InterruptedException {
761          return cluster.getClient().getJobCounters(getJobID());
762        }
763      });
764    }
765    catch (InterruptedException ie) {
766      throw new IOException(ie);
767    }
768  }
769
770  /**
771   * Gets the diagnostic messages for a given task attempt.
772   * @param taskid
773   * @return the list of diagnostic messages for the task
774   * @throws IOException
775   */
776  public String[] getTaskDiagnostics(final TaskAttemptID taskid) 
777      throws IOException, InterruptedException {
778    ensureState(JobState.RUNNING);
779    return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
780      @Override
781      public String[] run() throws IOException, InterruptedException {
782        return cluster.getClient().getTaskDiagnostics(taskid);
783      }
784    });
785  }
786
787  /**
788   * Set the number of reduce tasks for the job.
789   * @param tasks the number of reduce tasks
790   * @throws IllegalStateException if the job is submitted
791   */
792  public void setNumReduceTasks(int tasks) throws IllegalStateException {
793    ensureState(JobState.DEFINE);
794    conf.setNumReduceTasks(tasks);
795  }
796
797  /**
798   * Set the current working directory for the default file system.
799   * 
800   * @param dir the new current working directory.
801   * @throws IllegalStateException if the job is submitted
802   */
803  public void setWorkingDirectory(Path dir) throws IOException {
804    ensureState(JobState.DEFINE);
805    conf.setWorkingDirectory(dir);
806  }
807
808  /**
809   * Set the {@link InputFormat} for the job.
810   * @param cls the <code>InputFormat</code> to use
811   * @throws IllegalStateException if the job is submitted
812   */
813  public void setInputFormatClass(Class<? extends InputFormat> cls
814                                  ) throws IllegalStateException {
815    ensureState(JobState.DEFINE);
816    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
817                  InputFormat.class);
818  }
819
820  /**
821   * Set the {@link OutputFormat} for the job.
822   * @param cls the <code>OutputFormat</code> to use
823   * @throws IllegalStateException if the job is submitted
824   */
825  public void setOutputFormatClass(Class<? extends OutputFormat> cls
826                                   ) throws IllegalStateException {
827    ensureState(JobState.DEFINE);
828    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
829                  OutputFormat.class);
830  }
831
832  /**
833   * Set the {@link Mapper} for the job.
834   * @param cls the <code>Mapper</code> to use
835   * @throws IllegalStateException if the job is submitted
836   */
837  public void setMapperClass(Class<? extends Mapper> cls
838                             ) throws IllegalStateException {
839    ensureState(JobState.DEFINE);
840    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
841  }
842
843  /**
844   * Set the Jar by finding where a given class came from.
845   * @param cls the example class
846   */
847  public void setJarByClass(Class<?> cls) {
848    ensureState(JobState.DEFINE);
849    conf.setJarByClass(cls);
850  }
851
852  /**
853   * Set the job jar 
854   */
855  public void setJar(String jar) {
856    ensureState(JobState.DEFINE);
857    conf.setJar(jar);
858  }
859
860  /**
861   * Set the reported username for this job.
862   * 
863   * @param user the username for this job.
864   */
865  public void setUser(String user) {
866    ensureState(JobState.DEFINE);
867    conf.setUser(user);
868  }
869
870  /**
871   * Set the combiner class for the job.
872   * @param cls the combiner to use
873   * @throws IllegalStateException if the job is submitted
874   */
875  public void setCombinerClass(Class<? extends Reducer> cls
876                               ) throws IllegalStateException {
877    ensureState(JobState.DEFINE);
878    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
879  }
880
881  /**
882   * Set the {@link Reducer} for the job.
883   * @param cls the <code>Reducer</code> to use
884   * @throws IllegalStateException if the job is submitted
885   */
886  public void setReducerClass(Class<? extends Reducer> cls
887                              ) throws IllegalStateException {
888    ensureState(JobState.DEFINE);
889    conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
890  }
891
892  /**
893   * Set the {@link Partitioner} for the job.
894   * @param cls the <code>Partitioner</code> to use
895   * @throws IllegalStateException if the job is submitted
896   */
897  public void setPartitionerClass(Class<? extends Partitioner> cls
898                                  ) throws IllegalStateException {
899    ensureState(JobState.DEFINE);
900    conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
901                  Partitioner.class);
902  }
903
904  /**
905   * Set the key class for the map output data. This allows the user to
906   * specify the map output key class to be different than the final output
907   * value class.
908   * 
909   * @param theClass the map output key class.
910   * @throws IllegalStateException if the job is submitted
911   */
912  public void setMapOutputKeyClass(Class<?> theClass
913                                   ) throws IllegalStateException {
914    ensureState(JobState.DEFINE);
915    conf.setMapOutputKeyClass(theClass);
916  }
917
918  /**
919   * Set the value class for the map output data. This allows the user to
920   * specify the map output value class to be different than the final output
921   * value class.
922   * 
923   * @param theClass the map output value class.
924   * @throws IllegalStateException if the job is submitted
925   */
926  public void setMapOutputValueClass(Class<?> theClass
927                                     ) throws IllegalStateException {
928    ensureState(JobState.DEFINE);
929    conf.setMapOutputValueClass(theClass);
930  }
931
932  /**
933   * Set the key class for the job output data.
934   * 
935   * @param theClass the key class for the job output data.
936   * @throws IllegalStateException if the job is submitted
937   */
938  public void setOutputKeyClass(Class<?> theClass
939                                ) throws IllegalStateException {
940    ensureState(JobState.DEFINE);
941    conf.setOutputKeyClass(theClass);
942  }
943
944  /**
945   * Set the value class for job outputs.
946   * 
947   * @param theClass the value class for job outputs.
948   * @throws IllegalStateException if the job is submitted
949   */
950  public void setOutputValueClass(Class<?> theClass
951                                  ) throws IllegalStateException {
952    ensureState(JobState.DEFINE);
953    conf.setOutputValueClass(theClass);
954  }
955
956  /**
957   * Define the comparator that controls which keys are grouped together
958   * for a single call to combiner,
959   * {@link Reducer#reduce(Object, Iterable,
960   * org.apache.hadoop.mapreduce.Reducer.Context)}
961   *
962   * @param cls the raw comparator to use
963   * @throws IllegalStateException if the job is submitted
964   */
965  public void setCombinerKeyGroupingComparatorClass(
966      Class<? extends RawComparator> cls) throws IllegalStateException {
967    ensureState(JobState.DEFINE);
968    conf.setCombinerKeyGroupingComparator(cls);
969  }
970
971  /**
972   * Define the comparator that controls how the keys are sorted before they
973   * are passed to the {@link Reducer}.
974   * @param cls the raw comparator
975   * @throws IllegalStateException if the job is submitted
976   * @see #setCombinerKeyGroupingComparatorClass(Class)
977   */
978  public void setSortComparatorClass(Class<? extends RawComparator> cls
979                                     ) throws IllegalStateException {
980    ensureState(JobState.DEFINE);
981    conf.setOutputKeyComparatorClass(cls);
982  }
983
984  /**
985   * Define the comparator that controls which keys are grouped together
986   * for a single call to 
987   * {@link Reducer#reduce(Object, Iterable, 
988   *                       org.apache.hadoop.mapreduce.Reducer.Context)}
989   * @param cls the raw comparator to use
990   * @throws IllegalStateException if the job is submitted
991   * @see #setCombinerKeyGroupingComparatorClass(Class)
992   */
993  public void setGroupingComparatorClass(Class<? extends RawComparator> cls
994                                         ) throws IllegalStateException {
995    ensureState(JobState.DEFINE);
996    conf.setOutputValueGroupingComparator(cls);
997  }
998
999  /**
1000   * Set the user-specified job name.
1001   * 
1002   * @param name the job's new name.
1003   * @throws IllegalStateException if the job is submitted
1004   */
1005  public void setJobName(String name) throws IllegalStateException {
1006    ensureState(JobState.DEFINE);
1007    conf.setJobName(name);
1008  }
1009
1010  /**
1011   * Turn speculative execution on or off for this job. 
1012   * 
1013   * @param speculativeExecution <code>true</code> if speculative execution 
1014   *                             should be turned on, else <code>false</code>.
1015   */
1016  public void setSpeculativeExecution(boolean speculativeExecution) {
1017    ensureState(JobState.DEFINE);
1018    conf.setSpeculativeExecution(speculativeExecution);
1019  }
1020
1021  /**
1022   * Turn speculative execution on or off for this job for map tasks. 
1023   * 
1024   * @param speculativeExecution <code>true</code> if speculative execution 
1025   *                             should be turned on for map tasks,
1026   *                             else <code>false</code>.
1027   */
1028  public void setMapSpeculativeExecution(boolean speculativeExecution) {
1029    ensureState(JobState.DEFINE);
1030    conf.setMapSpeculativeExecution(speculativeExecution);
1031  }
1032
1033  /**
1034   * Turn speculative execution on or off for this job for reduce tasks. 
1035   * 
1036   * @param speculativeExecution <code>true</code> if speculative execution 
1037   *                             should be turned on for reduce tasks,
1038   *                             else <code>false</code>.
1039   */
1040  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1041    ensureState(JobState.DEFINE);
1042    conf.setReduceSpeculativeExecution(speculativeExecution);
1043  }
1044
1045  /**
1046   * Specify whether job-setup and job-cleanup is needed for the job 
1047   * 
1048   * @param needed If <code>true</code>, job-setup and job-cleanup will be
1049   *               considered from {@link OutputCommitter} 
1050   *               else ignored.
1051   */
1052  public void setJobSetupCleanupNeeded(boolean needed) {
1053    ensureState(JobState.DEFINE);
1054    conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
1055  }
1056
1057  /**
1058   * Set the given set of archives
1059   * @param archives The list of archives that need to be localized
1060   */
1061  public void setCacheArchives(URI[] archives) {
1062    ensureState(JobState.DEFINE);
1063    DistributedCache.setCacheArchives(archives, conf);
1064  }
1065
1066  /**
1067   * Set the given set of files
1068   * @param files The list of files that need to be localized
1069   */
1070  public void setCacheFiles(URI[] files) {
1071    ensureState(JobState.DEFINE);
1072    DistributedCache.setCacheFiles(files, conf);
1073  }
1074
1075  /**
1076   * Add a archives to be localized
1077   * @param uri The uri of the cache to be localized
1078   */
1079  public void addCacheArchive(URI uri) {
1080    ensureState(JobState.DEFINE);
1081    DistributedCache.addCacheArchive(uri, conf);
1082  }
1083  
1084  /**
1085   * Add a file to be localized
1086   * @param uri The uri of the cache to be localized
1087   */
1088  public void addCacheFile(URI uri) {
1089    ensureState(JobState.DEFINE);
1090    DistributedCache.addCacheFile(uri, conf);
1091  }
1092
1093  /**
1094   * Add an file path to the current set of classpath entries It adds the file
1095   * to cache as well.
1096   * 
1097   * Files added with this method will not be unpacked while being added to the
1098   * classpath.
1099   * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1100   * method instead.
1101   *
1102   * @param file Path of the file to be added
1103   */
1104  public void addFileToClassPath(Path file)
1105    throws IOException {
1106    ensureState(JobState.DEFINE);
1107    DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1108  }
1109
1110  /**
1111   * Add an archive path to the current set of classpath entries. It adds the
1112   * archive to cache as well.
1113   * 
1114   * Archive files will be unpacked and added to the classpath
1115   * when being distributed.
1116   *
1117   * @param archive Path of the archive to be added
1118   */
1119  public void addArchiveToClassPath(Path archive)
1120    throws IOException {
1121    ensureState(JobState.DEFINE);
1122    DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1123  }
1124
1125  /**
1126   * Originally intended to enable symlinks, but currently symlinks cannot be
1127   * disabled.
1128   */
1129  @Deprecated
1130  public void createSymlink() {
1131    ensureState(JobState.DEFINE);
1132    DistributedCache.createSymlink(conf);
1133  }
1134  
1135  /** 
1136   * Expert: Set the number of maximum attempts that will be made to run a
1137   * map task.
1138   * 
1139   * @param n the number of attempts per map task.
1140   */
1141  public void setMaxMapAttempts(int n) {
1142    ensureState(JobState.DEFINE);
1143    conf.setMaxMapAttempts(n);
1144  }
1145
1146  /** 
1147   * Expert: Set the number of maximum attempts that will be made to run a
1148   * reduce task.
1149   * 
1150   * @param n the number of attempts per reduce task.
1151   */
1152  public void setMaxReduceAttempts(int n) {
1153    ensureState(JobState.DEFINE);
1154    conf.setMaxReduceAttempts(n);
1155  }
1156
1157  /**
1158   * Set whether the system should collect profiler information for some of 
1159   * the tasks in this job? The information is stored in the user log 
1160   * directory.
1161   * @param newValue true means it should be gathered
1162   */
1163  public void setProfileEnabled(boolean newValue) {
1164    ensureState(JobState.DEFINE);
1165    conf.setProfileEnabled(newValue);
1166  }
1167
1168  /**
1169   * Set the profiler configuration arguments. If the string contains a '%s' it
1170   * will be replaced with the name of the profiling output file when the task
1171   * runs.
1172   *
1173   * This value is passed to the task child JVM on the command line.
1174   *
1175   * @param value the configuration string
1176   */
1177  public void setProfileParams(String value) {
1178    ensureState(JobState.DEFINE);
1179    conf.setProfileParams(value);
1180  }
1181
1182  /**
1183   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1184   * must also be called.
1185   * @param newValue a set of integer ranges of the map ids
1186   */
1187  public void setProfileTaskRange(boolean isMap, String newValue) {
1188    ensureState(JobState.DEFINE);
1189    conf.setProfileTaskRange(isMap, newValue);
1190  }
1191
1192  private void ensureNotSet(String attr, String msg) throws IOException {
1193    if (conf.get(attr) != null) {
1194      throw new IOException(attr + " is incompatible with " + msg + " mode.");
1195    }    
1196  }
1197  
1198  /**
1199   * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1200   * tokens upon job completion. Defaults to true.
1201   */
1202  public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1203    ensureState(JobState.DEFINE);
1204    conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1205  }
1206
1207  /**
1208   * Default to the new APIs unless they are explicitly set or the old mapper or
1209   * reduce attributes are used.
1210   * @throws IOException if the configuration is inconsistant
1211   */
1212  private void setUseNewAPI() throws IOException {
1213    int numReduces = conf.getNumReduceTasks();
1214    String oldMapperClass = "mapred.mapper.class";
1215    String oldReduceClass = "mapred.reducer.class";
1216    conf.setBooleanIfUnset("mapred.mapper.new-api",
1217                           conf.get(oldMapperClass) == null);
1218    if (conf.getUseNewMapper()) {
1219      String mode = "new map API";
1220      ensureNotSet("mapred.input.format.class", mode);
1221      ensureNotSet(oldMapperClass, mode);
1222      if (numReduces != 0) {
1223        ensureNotSet("mapred.partitioner.class", mode);
1224       } else {
1225        ensureNotSet("mapred.output.format.class", mode);
1226      }      
1227    } else {
1228      String mode = "map compatability";
1229      ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1230      ensureNotSet(MAP_CLASS_ATTR, mode);
1231      if (numReduces != 0) {
1232        ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1233       } else {
1234        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1235      }
1236    }
1237    if (numReduces != 0) {
1238      conf.setBooleanIfUnset("mapred.reducer.new-api",
1239                             conf.get(oldReduceClass) == null);
1240      if (conf.getUseNewReducer()) {
1241        String mode = "new reduce API";
1242        ensureNotSet("mapred.output.format.class", mode);
1243        ensureNotSet(oldReduceClass, mode);   
1244      } else {
1245        String mode = "reduce compatability";
1246        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1247        ensureNotSet(REDUCE_CLASS_ATTR, mode);   
1248      }
1249    }   
1250  }
1251
1252  private synchronized void connect()
1253          throws IOException, InterruptedException, ClassNotFoundException {
1254    if (cluster == null) {
1255      cluster = 
1256        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1257                   public Cluster run()
1258                          throws IOException, InterruptedException, 
1259                                 ClassNotFoundException {
1260                     return new Cluster(getConfiguration());
1261                   }
1262                 });
1263    }
1264  }
1265
1266  boolean isConnected() {
1267    return cluster != null;
1268  }
1269
1270  /** Only for mocking via unit tests. */
1271  @Private
1272  public JobSubmitter getJobSubmitter(FileSystem fs, 
1273      ClientProtocol submitClient) throws IOException {
1274    return new JobSubmitter(fs, submitClient);
1275  }
1276  /**
1277   * Submit the job to the cluster and return immediately.
1278   * @throws IOException
1279   */
1280  public void submit() 
1281         throws IOException, InterruptedException, ClassNotFoundException {
1282    ensureState(JobState.DEFINE);
1283    setUseNewAPI();
1284    connect();
1285    final JobSubmitter submitter = 
1286        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1287    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1288      public JobStatus run() throws IOException, InterruptedException, 
1289      ClassNotFoundException {
1290        return submitter.submitJobInternal(Job.this, cluster);
1291      }
1292    });
1293    state = JobState.RUNNING;
1294    LOG.info("The url to track the job: " + getTrackingURL());
1295   }
1296  
1297  /**
1298   * Submit the job to the cluster and wait for it to finish.
1299   * @param verbose print the progress to the user
1300   * @return true if the job succeeded
1301   * @throws IOException thrown if the communication with the 
1302   *         <code>JobTracker</code> is lost
1303   */
1304  public boolean waitForCompletion(boolean verbose
1305                                   ) throws IOException, InterruptedException,
1306                                            ClassNotFoundException {
1307    if (state == JobState.DEFINE) {
1308      submit();
1309    }
1310    if (verbose) {
1311      monitorAndPrintJob();
1312    } else {
1313      // get the completion poll interval from the client.
1314      int completionPollIntervalMillis = 
1315        Job.getCompletionPollInterval(cluster.getConf());
1316      while (!isComplete()) {
1317        try {
1318          Thread.sleep(completionPollIntervalMillis);
1319        } catch (InterruptedException ie) {
1320        }
1321      }
1322    }
1323    return isSuccessful();
1324  }
1325  
1326  /**
1327   * Monitor a job and print status in real-time as progress is made and tasks 
1328   * fail.
1329   * @return true if the job succeeded
1330   * @throws IOException if communication to the JobTracker fails
1331   */
1332  public boolean monitorAndPrintJob() 
1333      throws IOException, InterruptedException {
1334    String lastReport = null;
1335    Job.TaskStatusFilter filter;
1336    Configuration clientConf = getConfiguration();
1337    filter = Job.getTaskOutputFilter(clientConf);
1338    JobID jobId = getJobID();
1339    LOG.info("Running job: " + jobId);
1340    int eventCounter = 0;
1341    boolean profiling = getProfileEnabled();
1342    IntegerRanges mapRanges = getProfileTaskRange(true);
1343    IntegerRanges reduceRanges = getProfileTaskRange(false);
1344    int progMonitorPollIntervalMillis = 
1345      Job.getProgressPollInterval(clientConf);
1346    /* make sure to report full progress after the job is done */
1347    boolean reportedAfterCompletion = false;
1348    boolean reportedUberMode = false;
1349    while (!isComplete() || !reportedAfterCompletion) {
1350      if (isComplete()) {
1351        reportedAfterCompletion = true;
1352      } else {
1353        Thread.sleep(progMonitorPollIntervalMillis);
1354      }
1355      if (status.getState() == JobStatus.State.PREP) {
1356        continue;
1357      }      
1358      if (!reportedUberMode) {
1359        reportedUberMode = true;
1360        LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1361      }      
1362      String report = 
1363        (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1364            " reduce " + 
1365            StringUtils.formatPercent(reduceProgress(), 0));
1366      if (!report.equals(lastReport)) {
1367        LOG.info(report);
1368        lastReport = report;
1369      }
1370
1371      TaskCompletionEvent[] events = 
1372        getTaskCompletionEvents(eventCounter, 10); 
1373      eventCounter += events.length;
1374      printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1375    }
1376    boolean success = isSuccessful();
1377    if (success) {
1378      LOG.info("Job " + jobId + " completed successfully");
1379    } else {
1380      LOG.info("Job " + jobId + " failed with state " + status.getState() + 
1381          " due to: " + status.getFailureInfo());
1382    }
1383    Counters counters = getCounters();
1384    if (counters != null) {
1385      LOG.info(counters.toString());
1386    }
1387    return success;
1388  }
1389
1390  private void printTaskEvents(TaskCompletionEvent[] events,
1391      Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1392      IntegerRanges reduceRanges) throws IOException, InterruptedException {
1393    for (TaskCompletionEvent event : events) {
1394      switch (filter) {
1395      case NONE:
1396        break;
1397      case SUCCEEDED:
1398        if (event.getStatus() == 
1399          TaskCompletionEvent.Status.SUCCEEDED) {
1400          LOG.info(event.toString());
1401        }
1402        break; 
1403      case FAILED:
1404        if (event.getStatus() == 
1405          TaskCompletionEvent.Status.FAILED) {
1406          LOG.info(event.toString());
1407          // Displaying the task diagnostic information
1408          TaskAttemptID taskId = event.getTaskAttemptId();
1409          String[] taskDiagnostics = getTaskDiagnostics(taskId); 
1410          if (taskDiagnostics != null) {
1411            for (String diagnostics : taskDiagnostics) {
1412              System.err.println(diagnostics);
1413            }
1414          }
1415        }
1416        break; 
1417      case KILLED:
1418        if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1419          LOG.info(event.toString());
1420        }
1421        break; 
1422      case ALL:
1423        LOG.info(event.toString());
1424        break;
1425      }
1426    }
1427  }
1428
1429  /** The interval at which monitorAndPrintJob() prints status */
1430  public static int getProgressPollInterval(Configuration conf) {
1431    // Read progress monitor poll interval from config. Default is 1 second.
1432    int progMonitorPollIntervalMillis = conf.getInt(
1433      PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1434    if (progMonitorPollIntervalMillis < 1) {
1435      LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
1436        " has been set to an invalid value; "
1437        + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1438      progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1439    }
1440    return progMonitorPollIntervalMillis;
1441  }
1442
1443  /** The interval at which waitForCompletion() should check. */
1444  public static int getCompletionPollInterval(Configuration conf) {
1445    int completionPollIntervalMillis = conf.getInt(
1446      COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1447    if (completionPollIntervalMillis < 1) { 
1448      LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
1449       " has been set to an invalid value; "
1450       + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1451      completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1452    }
1453    return completionPollIntervalMillis;
1454  }
1455
1456  /**
1457   * Get the task output filter.
1458   * 
1459   * @param conf the configuration.
1460   * @return the filter level.
1461   */
1462  public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1463    return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1464  }
1465
1466  /**
1467   * Modify the Configuration to set the task output filter.
1468   * 
1469   * @param conf the Configuration to modify.
1470   * @param newValue the value to set.
1471   */
1472  public static void setTaskOutputFilter(Configuration conf, 
1473      TaskStatusFilter newValue) {
1474    conf.set(Job.OUTPUT_FILTER, newValue.toString());
1475  }
1476
1477  public boolean isUber() throws IOException, InterruptedException {
1478    ensureState(JobState.RUNNING);
1479    updateStatus();
1480    return status.isUber();
1481  }
1482
1483  /**
1484   * Get the reservation to which the job is submitted to, if any
1485   *
1486   * @return the reservationId the identifier of the job's reservation, null if
1487   *         the job does not have any reservation associated with it
1488   */
1489  public ReservationId getReservationId() {
1490    return reservationId;
1491  }
1492
1493  /**
1494   * Set the reservation to which the job is submitted to
1495   *
1496   * @param reservationId the reservationId to set
1497   */
1498  public void setReservationId(ReservationId reservationId) {
1499    this.reservationId = reservationId;
1500  }
1501  
1502}