001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.IOException;
022import java.net.URI;
023import java.security.PrivilegedExceptionAction;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.classification.InterfaceAudience.Private;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.conf.Configuration.IntegerRanges;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.RawComparator;
035import org.apache.hadoop.mapred.JobConf;
036import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038import org.apache.hadoop.mapreduce.task.JobContextImpl;
039import org.apache.hadoop.mapreduce.util.ConfigUtil;
040import org.apache.hadoop.util.StringUtils;
041import org.apache.hadoop.yarn.api.records.ReservationId;
042
043/**
044 * The job submitter's view of the Job.
045 * 
046 * <p>It allows the user to configure the
047 * job, submit it, control its execution, and query the state. The set methods
048 * only work until the job is submitted, afterwards they will throw an 
049 * IllegalStateException. </p>
050 * 
051 * <p>
052 * Normally the user creates the application, describes various facets of the
053 * job via {@link Job} and then submits the job and monitor its progress.</p>
054 * 
055 * <p>Here is an example on how to submit a job:</p>
056 * <p><blockquote><pre>
057 *     // Create a new Job
058 *     Job job = Job.getInstance();
059 *     job.setJarByClass(MyJob.class);
060 *     
061 *     // Specify various job-specific parameters     
062 *     job.setJobName("myjob");
063 *     
064 *     job.setInputPath(new Path("in"));
065 *     job.setOutputPath(new Path("out"));
066 *     
067 *     job.setMapperClass(MyJob.MyMapper.class);
068 *     job.setReducerClass(MyJob.MyReducer.class);
069 *
070 *     // Submit the job, then poll for progress until the job is complete
071 *     job.waitForCompletion(true);
072 * </pre></blockquote>
073 * 
074 * 
075 */
076@InterfaceAudience.Public
077@InterfaceStability.Evolving
078public class Job extends JobContextImpl implements JobContext {  
079  private static final Log LOG = LogFactory.getLog(Job.class);
080
081  @InterfaceStability.Evolving
082  public static enum JobState {DEFINE, RUNNING};
083  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
084  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
085  /** Key in mapred-*.xml that sets completionPollInvervalMillis */
086  public static final String COMPLETION_POLL_INTERVAL_KEY = 
087    "mapreduce.client.completion.pollinterval";
088  
089  /** Default completionPollIntervalMillis is 5000 ms. */
090  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
091  /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
092  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
093    "mapreduce.client.progressmonitor.pollinterval";
094  /** Default progMonitorPollIntervalMillis is 1000 ms. */
095  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
096
097  public static final String USED_GENERIC_PARSER = 
098    "mapreduce.client.genericoptionsparser.used";
099  public static final String SUBMIT_REPLICATION = 
100    "mapreduce.client.submit.file.replication";
101  public static final int DEFAULT_SUBMIT_REPLICATION = 10;
102
103  @InterfaceStability.Evolving
104  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
105
106  static {
107    ConfigUtil.loadResources();
108  }
109
110  private JobState state = JobState.DEFINE;
111  private JobStatus status;
112  private long statustime;
113  private Cluster cluster;
114  private ReservationId reservationId;
115
116  /**
117   * @deprecated Use {@link #getInstance()}
118   */
119  @Deprecated
120  public Job() throws IOException {
121    this(new JobConf(new Configuration()));
122  }
123
124  /**
125   * @deprecated Use {@link #getInstance(Configuration)}
126   */
127  @Deprecated
128  public Job(Configuration conf) throws IOException {
129    this(new JobConf(conf));
130  }
131
132  /**
133   * @deprecated Use {@link #getInstance(Configuration, String)}
134   */
135  @Deprecated
136  public Job(Configuration conf, String jobName) throws IOException {
137    this(new JobConf(conf));
138    setJobName(jobName);
139  }
140
141  Job(JobConf conf) throws IOException {
142    super(conf, null);
143    // propagate existing user credentials to job
144    this.credentials.mergeAll(this.ugi.getCredentials());
145    this.cluster = null;
146  }
147
148  Job(JobStatus status, JobConf conf) throws IOException {
149    this(conf);
150    setJobID(status.getJobID());
151    this.status = status;
152    state = JobState.RUNNING;
153  }
154
155      
156  /**
157   * Creates a new {@link Job} with no particular {@link Cluster} .
158   * A Cluster will be created with a generic {@link Configuration}.
159   * 
160   * @return the {@link Job} , with no connection to a cluster yet.
161   * @throws IOException
162   */
163  public static Job getInstance() throws IOException {
164    // create with a null Cluster
165    return getInstance(new Configuration());
166  }
167      
168  /**
169   * Creates a new {@link Job} with no particular {@link Cluster} and a 
170   * given {@link Configuration}.
171   * 
172   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
173   * that any necessary internal modifications do not reflect on the incoming 
174   * parameter.
175   * 
176   * A Cluster will be created from the conf parameter only when it's needed.
177   * 
178   * @param conf the configuration
179   * @return the {@link Job} , with no connection to a cluster yet.
180   * @throws IOException
181   */
182  public static Job getInstance(Configuration conf) throws IOException {
183    // create with a null Cluster
184    JobConf jobConf = new JobConf(conf);
185    return new Job(jobConf);
186  }
187
188      
189  /**
190   * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
191   * A Cluster will be created from the conf parameter only when it's needed.
192   *
193   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
194   * that any necessary internal modifications do not reflect on the incoming 
195   * parameter.
196   * 
197   * @param conf the configuration
198   * @return the {@link Job} , with no connection to a cluster yet.
199   * @throws IOException
200   */
201  public static Job getInstance(Configuration conf, String jobName)
202           throws IOException {
203    // create with a null Cluster
204    Job result = getInstance(conf);
205    result.setJobName(jobName);
206    return result;
207  }
208  
209  /**
210   * Creates a new {@link Job} with no particular {@link Cluster} and given
211   * {@link Configuration} and {@link JobStatus}.
212   * A Cluster will be created from the conf parameter only when it's needed.
213   * 
214   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
215   * that any necessary internal modifications do not reflect on the incoming 
216   * parameter.
217   * 
218   * @param status job status
219   * @param conf job configuration
220   * @return the {@link Job} , with no connection to a cluster yet.
221   * @throws IOException
222   */
223  public static Job getInstance(JobStatus status, Configuration conf) 
224  throws IOException {
225    return new Job(status, new JobConf(conf));
226  }
227
228  /**
229   * Creates a new {@link Job} with no particular {@link Cluster}.
230   * A Cluster will be created from the conf parameter only when it's needed.
231   *
232   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
233   * that any necessary internal modifications do not reflect on the incoming 
234   * parameter.
235   * 
236   * @param ignored
237   * @return the {@link Job} , with no connection to a cluster yet.
238   * @throws IOException
239   * @deprecated Use {@link #getInstance()}
240   */
241  @Deprecated
242  public static Job getInstance(Cluster ignored) throws IOException {
243    return getInstance();
244  }
245  
246  /**
247   * Creates a new {@link Job} with no particular {@link Cluster} and given
248   * {@link Configuration}.
249   * A Cluster will be created from the conf parameter only when it's needed.
250   * 
251   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
252   * that any necessary internal modifications do not reflect on the incoming 
253   * parameter.
254   * 
255   * @param ignored
256   * @param conf job configuration
257   * @return the {@link Job} , with no connection to a cluster yet.
258   * @throws IOException
259   * @deprecated Use {@link #getInstance(Configuration)}
260   */
261  @Deprecated
262  public static Job getInstance(Cluster ignored, Configuration conf) 
263      throws IOException {
264    return getInstance(conf);
265  }
266  
267  /**
268   * Creates a new {@link Job} with no particular {@link Cluster} and given
269   * {@link Configuration} and {@link JobStatus}.
270   * A Cluster will be created from the conf parameter only when it's needed.
271   * 
272   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
273   * that any necessary internal modifications do not reflect on the incoming 
274   * parameter.
275   * 
276   * @param cluster cluster
277   * @param status job status
278   * @param conf job configuration
279   * @return the {@link Job} , with no connection to a cluster yet.
280   * @throws IOException
281   */
282  @Private
283  public static Job getInstance(Cluster cluster, JobStatus status, 
284      Configuration conf) throws IOException {
285    Job job = getInstance(status, conf);
286    job.setCluster(cluster);
287    return job;
288  }
289
290  private void ensureState(JobState state) throws IllegalStateException {
291    if (state != this.state) {
292      throw new IllegalStateException("Job in state "+ this.state + 
293                                      " instead of " + state);
294    }
295
296    if (state == JobState.RUNNING && cluster == null) {
297      throw new IllegalStateException
298        ("Job in state " + this.state
299         + ", but it isn't attached to any job tracker!");
300    }
301  }
302
303  /**
304   * Some methods rely on having a recent job status object.  Refresh
305   * it, if necessary
306   */
307  synchronized void ensureFreshStatus() 
308      throws IOException {
309    if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
310      updateStatus();
311    }
312  }
313    
314  /** Some methods need to update status immediately. So, refresh
315   * immediately
316   * @throws IOException
317   */
318  synchronized void updateStatus() throws IOException {
319    try {
320      this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
321        @Override
322        public JobStatus run() throws IOException, InterruptedException {
323          return cluster.getClient().getJobStatus(status.getJobID());
324        }
325      });
326    }
327    catch (InterruptedException ie) {
328      throw new IOException(ie);
329    }
330    if (this.status == null) {
331      throw new IOException("Job status not available ");
332    }
333    this.statustime = System.currentTimeMillis();
334  }
335  
336  public JobStatus getStatus() throws IOException, InterruptedException {
337    ensureState(JobState.RUNNING);
338    updateStatus();
339    return status;
340  }
341
342  /**
343   * Returns the current state of the Job.
344   * 
345   * @return JobStatus#State
346   * @throws IOException
347   * @throws InterruptedException
348   */
349  public JobStatus.State getJobState() 
350      throws IOException, InterruptedException {
351    ensureState(JobState.RUNNING);
352    updateStatus();
353    return status.getState();
354  }
355  
356  /**
357   * Get the URL where some job progress information will be displayed.
358   * 
359   * @return the URL where some job progress information will be displayed.
360   */
361  public String getTrackingURL(){
362    ensureState(JobState.RUNNING);
363    return status.getTrackingUrl().toString();
364  }
365
366  /**
367   * Get the path of the submitted job configuration.
368   * 
369   * @return the path of the submitted job configuration.
370   */
371  public String getJobFile() {
372    ensureState(JobState.RUNNING);
373    return status.getJobFile();
374  }
375
376  /**
377   * Get start time of the job.
378   * 
379   * @return the start time of the job
380   */
381  public long getStartTime() {
382    ensureState(JobState.RUNNING);
383    return status.getStartTime();
384  }
385
386  /**
387   * Get finish time of the job.
388   * 
389   * @return the finish time of the job
390   */
391  public long getFinishTime() throws IOException, InterruptedException {
392    ensureState(JobState.RUNNING);
393    updateStatus();
394    return status.getFinishTime();
395  }
396
397  /**
398   * Get scheduling info of the job.
399   * 
400   * @return the scheduling info of the job
401   */
402  public String getSchedulingInfo() {
403    ensureState(JobState.RUNNING);
404    return status.getSchedulingInfo();
405  }
406
407  /**
408   * Get scheduling info of the job.
409   * 
410   * @return the priority info of the job
411   */
412  public JobPriority getPriority() throws IOException, InterruptedException {
413    ensureState(JobState.RUNNING);
414    updateStatus();
415    return status.getPriority();
416  }
417
418  /**
419   * The user-specified job name.
420   */
421  public String getJobName() {
422    if (state == JobState.DEFINE) {
423      return super.getJobName();
424    }
425    ensureState(JobState.RUNNING);
426    return status.getJobName();
427  }
428
429  public String getHistoryUrl() throws IOException, InterruptedException {
430    ensureState(JobState.RUNNING);
431    updateStatus();
432    return status.getHistoryFile();
433  }
434
435  public boolean isRetired() throws IOException, InterruptedException {
436    ensureState(JobState.RUNNING);
437    updateStatus();
438    return status.isRetired();
439  }
440  
441  @Private
442  public Cluster getCluster() {
443    return cluster;
444  }
445
446  /** Only for mocks in unit tests. */
447  @Private
448  private void setCluster(Cluster cluster) {
449    this.cluster = cluster;
450  }
451
452  /**
453   * Dump stats to screen.
454   */
455  @Override
456  public String toString() {
457    ensureState(JobState.RUNNING);
458    String reasonforFailure = " ";
459    int numMaps = 0;
460    int numReduces = 0;
461    try {
462      updateStatus();
463      if (status.getState().equals(JobStatus.State.FAILED))
464        reasonforFailure = getTaskFailureEventString();
465      numMaps = getTaskReports(TaskType.MAP).length;
466      numReduces = getTaskReports(TaskType.REDUCE).length;
467    } catch (IOException e) {
468    } catch (InterruptedException ie) {
469    }
470    StringBuffer sb = new StringBuffer();
471    sb.append("Job: ").append(status.getJobID()).append("\n");
472    sb.append("Job File: ").append(status.getJobFile()).append("\n");
473    sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
474    sb.append("\n");
475    sb.append("Uber job : ").append(status.isUber()).append("\n");
476    sb.append("Number of maps: ").append(numMaps).append("\n");
477    sb.append("Number of reduces: ").append(numReduces).append("\n");
478    sb.append("map() completion: ");
479    sb.append(status.getMapProgress()).append("\n");
480    sb.append("reduce() completion: ");
481    sb.append(status.getReduceProgress()).append("\n");
482    sb.append("Job state: ");
483    sb.append(status.getState()).append("\n");
484    sb.append("retired: ").append(status.isRetired()).append("\n");
485    sb.append("reason for failure: ").append(reasonforFailure);
486    return sb.toString();
487  }
488
489  /**
490   * @return taskid which caused job failure
491   * @throws IOException
492   * @throws InterruptedException
493   */
494  String getTaskFailureEventString() throws IOException,
495      InterruptedException {
496    int failCount = 1;
497    TaskCompletionEvent lastEvent = null;
498    TaskCompletionEvent[] events = ugi.doAs(new 
499        PrivilegedExceptionAction<TaskCompletionEvent[]>() {
500          @Override
501          public TaskCompletionEvent[] run() throws IOException,
502          InterruptedException {
503            return cluster.getClient().getTaskCompletionEvents(
504                status.getJobID(), 0, 10);
505          }
506        });
507    for (TaskCompletionEvent event : events) {
508      if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
509        failCount++;
510        lastEvent = event;
511      }
512    }
513    if (lastEvent == null) {
514      return "There are no failed tasks for the job. "
515          + "Job is failed due to some other reason and reason "
516          + "can be found in the logs.";
517    }
518    String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
519    String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
520    return (" task " + taskID + " failed " +
521      failCount + " times " + "For details check tasktracker at: " +
522      lastEvent.getTaskTrackerHttp());
523  }
524
525  /**
526   * Get the information of the current state of the tasks of a job.
527   * 
528   * @param type Type of the task
529   * @return the list of all of the map tips.
530   * @throws IOException
531   */
532  public TaskReport[] getTaskReports(TaskType type) 
533      throws IOException, InterruptedException {
534    ensureState(JobState.RUNNING);
535    final TaskType tmpType = type;
536    return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
537      public TaskReport[] run() throws IOException, InterruptedException {
538        return cluster.getClient().getTaskReports(getJobID(), tmpType);
539      }
540    });
541  }
542
543  /**
544   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
545   * and 1.0.  When all map tasks have completed, the function returns 1.0.
546   * 
547   * @return the progress of the job's map-tasks.
548   * @throws IOException
549   */
550  public float mapProgress() throws IOException {
551    ensureState(JobState.RUNNING);
552    ensureFreshStatus();
553    return status.getMapProgress();
554  }
555
556  /**
557   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
558   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
559   * 
560   * @return the progress of the job's reduce-tasks.
561   * @throws IOException
562   */
563  public float reduceProgress() throws IOException {
564    ensureState(JobState.RUNNING);
565    ensureFreshStatus();
566    return status.getReduceProgress();
567  }
568
569  /**
570   * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
571   * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
572   * 
573   * @return the progress of the job's cleanup-tasks.
574   * @throws IOException
575   */
576  public float cleanupProgress() throws IOException, InterruptedException {
577    ensureState(JobState.RUNNING);
578    ensureFreshStatus();
579    return status.getCleanupProgress();
580  }
581
582  /**
583   * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
584   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
585   * 
586   * @return the progress of the job's setup-tasks.
587   * @throws IOException
588   */
589  public float setupProgress() throws IOException {
590    ensureState(JobState.RUNNING);
591    ensureFreshStatus();
592    return status.getSetupProgress();
593  }
594
595  /**
596   * Check if the job is finished or not. 
597   * This is a non-blocking call.
598   * 
599   * @return <code>true</code> if the job is complete, else <code>false</code>.
600   * @throws IOException
601   */
602  public boolean isComplete() throws IOException {
603    ensureState(JobState.RUNNING);
604    updateStatus();
605    return status.isJobComplete();
606  }
607
608  /**
609   * Check if the job completed successfully. 
610   * 
611   * @return <code>true</code> if the job succeeded, else <code>false</code>.
612   * @throws IOException
613   */
614  public boolean isSuccessful() throws IOException {
615    ensureState(JobState.RUNNING);
616    updateStatus();
617    return status.getState() == JobStatus.State.SUCCEEDED;
618  }
619
620  /**
621   * Kill the running job.  Blocks until all job tasks have been
622   * killed as well.  If the job is no longer running, it simply returns.
623   * 
624   * @throws IOException
625   */
626  public void killJob() throws IOException {
627    ensureState(JobState.RUNNING);
628    try {
629      cluster.getClient().killJob(getJobID());
630    }
631    catch (InterruptedException ie) {
632      throw new IOException(ie);
633    }
634  }
635
636  /**
637   * Set the priority of a running job.
638   * @param jobPriority the new priority for the job.
639   * @throws IOException
640   */
641  public void setPriority(JobPriority jobPriority) throws IOException,
642      InterruptedException {
643    if (state == JobState.DEFINE) {
644      if (jobPriority == JobPriority.UNDEFINED_PRIORITY) {
645        conf.setJobPriorityAsInteger(convertPriorityToInteger(jobPriority));
646      } else {
647        conf.setJobPriority(org.apache.hadoop.mapred.JobPriority
648            .valueOf(jobPriority.name()));
649      }
650    } else {
651      ensureState(JobState.RUNNING);
652      final int tmpPriority = convertPriorityToInteger(jobPriority);
653      ugi.doAs(new PrivilegedExceptionAction<Object>() {
654        @Override
655        public Object run() throws IOException, InterruptedException {
656          cluster.getClient()
657              .setJobPriority(getJobID(), Integer.toString(tmpPriority));
658          return null;
659        }
660      });
661    }
662  }
663
664  /**
665   * Set the priority of a running job.
666   *
667   * @param jobPriority
668   *          the new priority for the job.
669   * @throws IOException
670   */
671  public void setPriorityAsInteger(int jobPriority) throws IOException,
672      InterruptedException {
673    if (state == JobState.DEFINE) {
674      conf.setJobPriorityAsInteger(jobPriority);
675    } else {
676      ensureState(JobState.RUNNING);
677      final int tmpPriority = jobPriority;
678      ugi.doAs(new PrivilegedExceptionAction<Object>() {
679        @Override
680        public Object run() throws IOException, InterruptedException {
681          cluster.getClient()
682              .setJobPriority(getJobID(), Integer.toString(tmpPriority));
683          return null;
684        }
685      });
686    }
687  }
688
689  private int convertPriorityToInteger(JobPriority jobPriority) {
690    switch (jobPriority) {
691    case VERY_HIGH :
692      return 5;
693    case HIGH :
694      return 4;
695    case NORMAL :
696      return 3;
697    case LOW :
698      return 2;
699    case VERY_LOW :
700      return 1;
701    case DEFAULT :
702      return 0;
703    default:
704      break;
705    }
706    // For UNDEFINED_PRIORITY, we can set it to default for better handling
707    return 0;
708  }
709
710  /**
711   * Get events indicating completion (success/failure) of component tasks.
712   *  
713   * @param startFrom index to start fetching events from
714   * @param numEvents number of events to fetch
715   * @return an array of {@link TaskCompletionEvent}s
716   * @throws IOException
717   */
718  public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
719      final int numEvents) throws IOException, InterruptedException {
720    ensureState(JobState.RUNNING);
721    return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
722      @Override
723      public TaskCompletionEvent[] run() throws IOException, InterruptedException {
724        return cluster.getClient().getTaskCompletionEvents(getJobID(),
725            startFrom, numEvents); 
726      }
727    });
728  }
729
730  /**
731   * Get events indicating completion (success/failure) of component tasks.
732   *  
733   * @param startFrom index to start fetching events from
734   * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s
735   * @throws IOException
736   */
737  public org.apache.hadoop.mapred.TaskCompletionEvent[]
738    getTaskCompletionEvents(final int startFrom) throws IOException {
739    try {
740      TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10);
741      org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents =
742          new org.apache.hadoop.mapred.TaskCompletionEvent[events.length];
743      for (int i = 0; i < events.length; i++) {
744        retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade
745            (events[i]);
746      }
747      return retEvents;
748    } catch (InterruptedException ie) {
749      throw new IOException(ie);
750    }
751  }
752
753  /**
754   * Kill indicated task attempt.
755   * @param taskId the id of the task to kill.
756   * @param shouldFail if <code>true</code> the task is failed and added
757   *                   to failed tasks list, otherwise it is just killed,
758   *                   w/o affecting job failure status.
759   */
760  @Private
761  public boolean killTask(final TaskAttemptID taskId,
762                          final boolean shouldFail) throws IOException {
763    ensureState(JobState.RUNNING);
764    try {
765      return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
766        public Boolean run() throws IOException, InterruptedException {
767          return cluster.getClient().killTask(taskId, shouldFail);
768        }
769      });
770    }
771    catch (InterruptedException ie) {
772      throw new IOException(ie);
773    }
774  }
775
776  /**
777   * Kill indicated task attempt.
778   * 
779   * @param taskId the id of the task to be terminated.
780   * @throws IOException
781   */
782  public void killTask(final TaskAttemptID taskId)
783      throws IOException {
784    killTask(taskId, false);
785  }
786
787  /**
788   * Fail indicated task attempt.
789   * 
790   * @param taskId the id of the task to be terminated.
791   * @throws IOException
792   */
793  public void failTask(final TaskAttemptID taskId)
794      throws IOException {
795    killTask(taskId, true);
796  }
797
798  /**
799   * Gets the counters for this job. May return null if the job has been
800   * retired and the job is no longer in the completed job store.
801   * 
802   * @return the counters for this job.
803   * @throws IOException
804   */
805  public Counters getCounters() 
806      throws IOException {
807    ensureState(JobState.RUNNING);
808    try {
809      return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
810        @Override
811        public Counters run() throws IOException, InterruptedException {
812          return cluster.getClient().getJobCounters(getJobID());
813        }
814      });
815    }
816    catch (InterruptedException ie) {
817      throw new IOException(ie);
818    }
819  }
820
821  /**
822   * Gets the diagnostic messages for a given task attempt.
823   * @param taskid
824   * @return the list of diagnostic messages for the task
825   * @throws IOException
826   */
827  public String[] getTaskDiagnostics(final TaskAttemptID taskid) 
828      throws IOException, InterruptedException {
829    ensureState(JobState.RUNNING);
830    return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
831      @Override
832      public String[] run() throws IOException, InterruptedException {
833        return cluster.getClient().getTaskDiagnostics(taskid);
834      }
835    });
836  }
837
838  /**
839   * Set the number of reduce tasks for the job.
840   * @param tasks the number of reduce tasks
841   * @throws IllegalStateException if the job is submitted
842   */
843  public void setNumReduceTasks(int tasks) throws IllegalStateException {
844    ensureState(JobState.DEFINE);
845    conf.setNumReduceTasks(tasks);
846  }
847
848  /**
849   * Set the current working directory for the default file system.
850   * 
851   * @param dir the new current working directory.
852   * @throws IllegalStateException if the job is submitted
853   */
854  public void setWorkingDirectory(Path dir) throws IOException {
855    ensureState(JobState.DEFINE);
856    conf.setWorkingDirectory(dir);
857  }
858
859  /**
860   * Set the {@link InputFormat} for the job.
861   * @param cls the <code>InputFormat</code> to use
862   * @throws IllegalStateException if the job is submitted
863   */
864  public void setInputFormatClass(Class<? extends InputFormat> cls
865                                  ) throws IllegalStateException {
866    ensureState(JobState.DEFINE);
867    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
868                  InputFormat.class);
869  }
870
871  /**
872   * Set the {@link OutputFormat} for the job.
873   * @param cls the <code>OutputFormat</code> to use
874   * @throws IllegalStateException if the job is submitted
875   */
876  public void setOutputFormatClass(Class<? extends OutputFormat> cls
877                                   ) throws IllegalStateException {
878    ensureState(JobState.DEFINE);
879    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
880                  OutputFormat.class);
881  }
882
883  /**
884   * Set the {@link Mapper} for the job.
885   * @param cls the <code>Mapper</code> to use
886   * @throws IllegalStateException if the job is submitted
887   */
888  public void setMapperClass(Class<? extends Mapper> cls
889                             ) throws IllegalStateException {
890    ensureState(JobState.DEFINE);
891    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
892  }
893
894  /**
895   * Set the Jar by finding where a given class came from.
896   * @param cls the example class
897   */
898  public void setJarByClass(Class<?> cls) {
899    ensureState(JobState.DEFINE);
900    conf.setJarByClass(cls);
901  }
902
903  /**
904   * Set the job jar 
905   */
906  public void setJar(String jar) {
907    ensureState(JobState.DEFINE);
908    conf.setJar(jar);
909  }
910
911  /**
912   * Set the reported username for this job.
913   * 
914   * @param user the username for this job.
915   */
916  public void setUser(String user) {
917    ensureState(JobState.DEFINE);
918    conf.setUser(user);
919  }
920
921  /**
922   * Set the combiner class for the job.
923   * @param cls the combiner to use
924   * @throws IllegalStateException if the job is submitted
925   */
926  public void setCombinerClass(Class<? extends Reducer> cls
927                               ) throws IllegalStateException {
928    ensureState(JobState.DEFINE);
929    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
930  }
931
932  /**
933   * Set the {@link Reducer} for the job.
934   * @param cls the <code>Reducer</code> to use
935   * @throws IllegalStateException if the job is submitted
936   */
937  public void setReducerClass(Class<? extends Reducer> cls
938                              ) throws IllegalStateException {
939    ensureState(JobState.DEFINE);
940    conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
941  }
942
943  /**
944   * Set the {@link Partitioner} for the job.
945   * @param cls the <code>Partitioner</code> to use
946   * @throws IllegalStateException if the job is submitted
947   */
948  public void setPartitionerClass(Class<? extends Partitioner> cls
949                                  ) throws IllegalStateException {
950    ensureState(JobState.DEFINE);
951    conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
952                  Partitioner.class);
953  }
954
955  /**
956   * Set the key class for the map output data. This allows the user to
957   * specify the map output key class to be different than the final output
958   * value class.
959   * 
960   * @param theClass the map output key class.
961   * @throws IllegalStateException if the job is submitted
962   */
963  public void setMapOutputKeyClass(Class<?> theClass
964                                   ) throws IllegalStateException {
965    ensureState(JobState.DEFINE);
966    conf.setMapOutputKeyClass(theClass);
967  }
968
969  /**
970   * Set the value class for the map output data. This allows the user to
971   * specify the map output value class to be different than the final output
972   * value class.
973   * 
974   * @param theClass the map output value class.
975   * @throws IllegalStateException if the job is submitted
976   */
977  public void setMapOutputValueClass(Class<?> theClass
978                                     ) throws IllegalStateException {
979    ensureState(JobState.DEFINE);
980    conf.setMapOutputValueClass(theClass);
981  }
982
983  /**
984   * Set the key class for the job output data.
985   * 
986   * @param theClass the key class for the job output data.
987   * @throws IllegalStateException if the job is submitted
988   */
989  public void setOutputKeyClass(Class<?> theClass
990                                ) throws IllegalStateException {
991    ensureState(JobState.DEFINE);
992    conf.setOutputKeyClass(theClass);
993  }
994
995  /**
996   * Set the value class for job outputs.
997   * 
998   * @param theClass the value class for job outputs.
999   * @throws IllegalStateException if the job is submitted
1000   */
1001  public void setOutputValueClass(Class<?> theClass
1002                                  ) throws IllegalStateException {
1003    ensureState(JobState.DEFINE);
1004    conf.setOutputValueClass(theClass);
1005  }
1006
1007  /**
1008   * Define the comparator that controls which keys are grouped together
1009   * for a single call to combiner,
1010   * {@link Reducer#reduce(Object, Iterable,
1011   * org.apache.hadoop.mapreduce.Reducer.Context)}
1012   *
1013   * @param cls the raw comparator to use
1014   * @throws IllegalStateException if the job is submitted
1015   */
1016  public void setCombinerKeyGroupingComparatorClass(
1017      Class<? extends RawComparator> cls) throws IllegalStateException {
1018    ensureState(JobState.DEFINE);
1019    conf.setCombinerKeyGroupingComparator(cls);
1020  }
1021
1022  /**
1023   * Define the comparator that controls how the keys are sorted before they
1024   * are passed to the {@link Reducer}.
1025   * @param cls the raw comparator
1026   * @throws IllegalStateException if the job is submitted
1027   * @see #setCombinerKeyGroupingComparatorClass(Class)
1028   */
1029  public void setSortComparatorClass(Class<? extends RawComparator> cls
1030                                     ) throws IllegalStateException {
1031    ensureState(JobState.DEFINE);
1032    conf.setOutputKeyComparatorClass(cls);
1033  }
1034
1035  /**
1036   * Define the comparator that controls which keys are grouped together
1037   * for a single call to 
1038   * {@link Reducer#reduce(Object, Iterable, 
1039   *                       org.apache.hadoop.mapreduce.Reducer.Context)}
1040   * @param cls the raw comparator to use
1041   * @throws IllegalStateException if the job is submitted
1042   * @see #setCombinerKeyGroupingComparatorClass(Class)
1043   */
1044  public void setGroupingComparatorClass(Class<? extends RawComparator> cls
1045                                         ) throws IllegalStateException {
1046    ensureState(JobState.DEFINE);
1047    conf.setOutputValueGroupingComparator(cls);
1048  }
1049
1050  /**
1051   * Set the user-specified job name.
1052   * 
1053   * @param name the job's new name.
1054   * @throws IllegalStateException if the job is submitted
1055   */
1056  public void setJobName(String name) throws IllegalStateException {
1057    ensureState(JobState.DEFINE);
1058    conf.setJobName(name);
1059  }
1060
1061  /**
1062   * Turn speculative execution on or off for this job. 
1063   * 
1064   * @param speculativeExecution <code>true</code> if speculative execution 
1065   *                             should be turned on, else <code>false</code>.
1066   */
1067  public void setSpeculativeExecution(boolean speculativeExecution) {
1068    ensureState(JobState.DEFINE);
1069    conf.setSpeculativeExecution(speculativeExecution);
1070  }
1071
1072  /**
1073   * Turn speculative execution on or off for this job for map tasks. 
1074   * 
1075   * @param speculativeExecution <code>true</code> if speculative execution 
1076   *                             should be turned on for map tasks,
1077   *                             else <code>false</code>.
1078   */
1079  public void setMapSpeculativeExecution(boolean speculativeExecution) {
1080    ensureState(JobState.DEFINE);
1081    conf.setMapSpeculativeExecution(speculativeExecution);
1082  }
1083
1084  /**
1085   * Turn speculative execution on or off for this job for reduce tasks. 
1086   * 
1087   * @param speculativeExecution <code>true</code> if speculative execution 
1088   *                             should be turned on for reduce tasks,
1089   *                             else <code>false</code>.
1090   */
1091  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1092    ensureState(JobState.DEFINE);
1093    conf.setReduceSpeculativeExecution(speculativeExecution);
1094  }
1095
1096  /**
1097   * Specify whether job-setup and job-cleanup is needed for the job 
1098   * 
1099   * @param needed If <code>true</code>, job-setup and job-cleanup will be
1100   *               considered from {@link OutputCommitter} 
1101   *               else ignored.
1102   */
1103  public void setJobSetupCleanupNeeded(boolean needed) {
1104    ensureState(JobState.DEFINE);
1105    conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
1106  }
1107
1108  /**
1109   * Set the given set of archives
1110   * @param archives The list of archives that need to be localized
1111   */
1112  public void setCacheArchives(URI[] archives) {
1113    ensureState(JobState.DEFINE);
1114    DistributedCache.setCacheArchives(archives, conf);
1115  }
1116
1117  /**
1118   * Set the given set of files
1119   * @param files The list of files that need to be localized
1120   */
1121  public void setCacheFiles(URI[] files) {
1122    ensureState(JobState.DEFINE);
1123    DistributedCache.setCacheFiles(files, conf);
1124  }
1125
1126  /**
1127   * Add a archives to be localized
1128   * @param uri The uri of the cache to be localized
1129   */
1130  public void addCacheArchive(URI uri) {
1131    ensureState(JobState.DEFINE);
1132    DistributedCache.addCacheArchive(uri, conf);
1133  }
1134  
1135  /**
1136   * Add a file to be localized
1137   * @param uri The uri of the cache to be localized
1138   */
1139  public void addCacheFile(URI uri) {
1140    ensureState(JobState.DEFINE);
1141    DistributedCache.addCacheFile(uri, conf);
1142  }
1143
1144  /**
1145   * Add an file path to the current set of classpath entries It adds the file
1146   * to cache as well.
1147   * 
1148   * Files added with this method will not be unpacked while being added to the
1149   * classpath.
1150   * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1151   * method instead.
1152   *
1153   * @param file Path of the file to be added
1154   */
1155  public void addFileToClassPath(Path file)
1156    throws IOException {
1157    ensureState(JobState.DEFINE);
1158    DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1159  }
1160
1161  /**
1162   * Add an archive path to the current set of classpath entries. It adds the
1163   * archive to cache as well.
1164   * 
1165   * Archive files will be unpacked and added to the classpath
1166   * when being distributed.
1167   *
1168   * @param archive Path of the archive to be added
1169   */
1170  public void addArchiveToClassPath(Path archive)
1171    throws IOException {
1172    ensureState(JobState.DEFINE);
1173    DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1174  }
1175
1176  /**
1177   * Originally intended to enable symlinks, but currently symlinks cannot be
1178   * disabled.
1179   */
1180  @Deprecated
1181  public void createSymlink() {
1182    ensureState(JobState.DEFINE);
1183    DistributedCache.createSymlink(conf);
1184  }
1185  
1186  /** 
1187   * Expert: Set the number of maximum attempts that will be made to run a
1188   * map task.
1189   * 
1190   * @param n the number of attempts per map task.
1191   */
1192  public void setMaxMapAttempts(int n) {
1193    ensureState(JobState.DEFINE);
1194    conf.setMaxMapAttempts(n);
1195  }
1196
1197  /** 
1198   * Expert: Set the number of maximum attempts that will be made to run a
1199   * reduce task.
1200   * 
1201   * @param n the number of attempts per reduce task.
1202   */
1203  public void setMaxReduceAttempts(int n) {
1204    ensureState(JobState.DEFINE);
1205    conf.setMaxReduceAttempts(n);
1206  }
1207
1208  /**
1209   * Set whether the system should collect profiler information for some of 
1210   * the tasks in this job? The information is stored in the user log 
1211   * directory.
1212   * @param newValue true means it should be gathered
1213   */
1214  public void setProfileEnabled(boolean newValue) {
1215    ensureState(JobState.DEFINE);
1216    conf.setProfileEnabled(newValue);
1217  }
1218
1219  /**
1220   * Set the profiler configuration arguments. If the string contains a '%s' it
1221   * will be replaced with the name of the profiling output file when the task
1222   * runs.
1223   *
1224   * This value is passed to the task child JVM on the command line.
1225   *
1226   * @param value the configuration string
1227   */
1228  public void setProfileParams(String value) {
1229    ensureState(JobState.DEFINE);
1230    conf.setProfileParams(value);
1231  }
1232
1233  /**
1234   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1235   * must also be called.
1236   * @param newValue a set of integer ranges of the map ids
1237   */
1238  public void setProfileTaskRange(boolean isMap, String newValue) {
1239    ensureState(JobState.DEFINE);
1240    conf.setProfileTaskRange(isMap, newValue);
1241  }
1242
1243  private void ensureNotSet(String attr, String msg) throws IOException {
1244    if (conf.get(attr) != null) {
1245      throw new IOException(attr + " is incompatible with " + msg + " mode.");
1246    }    
1247  }
1248  
1249  /**
1250   * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1251   * tokens upon job completion. Defaults to true.
1252   */
1253  public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1254    ensureState(JobState.DEFINE);
1255    conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1256  }
1257
1258  /**
1259   * Default to the new APIs unless they are explicitly set or the old mapper or
1260   * reduce attributes are used.
1261   * @throws IOException if the configuration is inconsistent
1262   */
1263  private void setUseNewAPI() throws IOException {
1264    int numReduces = conf.getNumReduceTasks();
1265    String oldMapperClass = "mapred.mapper.class";
1266    String oldReduceClass = "mapred.reducer.class";
1267    conf.setBooleanIfUnset("mapred.mapper.new-api",
1268                           conf.get(oldMapperClass) == null);
1269    if (conf.getUseNewMapper()) {
1270      String mode = "new map API";
1271      ensureNotSet("mapred.input.format.class", mode);
1272      ensureNotSet(oldMapperClass, mode);
1273      if (numReduces != 0) {
1274        ensureNotSet("mapred.partitioner.class", mode);
1275       } else {
1276        ensureNotSet("mapred.output.format.class", mode);
1277      }      
1278    } else {
1279      String mode = "map compatibility";
1280      ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1281      ensureNotSet(MAP_CLASS_ATTR, mode);
1282      if (numReduces != 0) {
1283        ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1284       } else {
1285        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1286      }
1287    }
1288    if (numReduces != 0) {
1289      conf.setBooleanIfUnset("mapred.reducer.new-api",
1290                             conf.get(oldReduceClass) == null);
1291      if (conf.getUseNewReducer()) {
1292        String mode = "new reduce API";
1293        ensureNotSet("mapred.output.format.class", mode);
1294        ensureNotSet(oldReduceClass, mode);   
1295      } else {
1296        String mode = "reduce compatibility";
1297        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1298        ensureNotSet(REDUCE_CLASS_ATTR, mode);   
1299      }
1300    }   
1301  }
1302
1303  private synchronized void connect()
1304          throws IOException, InterruptedException, ClassNotFoundException {
1305    if (cluster == null) {
1306      cluster = 
1307        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1308                   public Cluster run()
1309                          throws IOException, InterruptedException, 
1310                                 ClassNotFoundException {
1311                     return new Cluster(getConfiguration());
1312                   }
1313                 });
1314    }
1315  }
1316
1317  boolean isConnected() {
1318    return cluster != null;
1319  }
1320
1321  /** Only for mocking via unit tests. */
1322  @Private
1323  public JobSubmitter getJobSubmitter(FileSystem fs, 
1324      ClientProtocol submitClient) throws IOException {
1325    return new JobSubmitter(fs, submitClient);
1326  }
1327  /**
1328   * Submit the job to the cluster and return immediately.
1329   * @throws IOException
1330   */
1331  public void submit() 
1332         throws IOException, InterruptedException, ClassNotFoundException {
1333    ensureState(JobState.DEFINE);
1334    setUseNewAPI();
1335    connect();
1336    final JobSubmitter submitter = 
1337        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1338    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1339      public JobStatus run() throws IOException, InterruptedException, 
1340      ClassNotFoundException {
1341        return submitter.submitJobInternal(Job.this, cluster);
1342      }
1343    });
1344    state = JobState.RUNNING;
1345    LOG.info("The url to track the job: " + getTrackingURL());
1346   }
1347  
1348  /**
1349   * Submit the job to the cluster and wait for it to finish.
1350   * @param verbose print the progress to the user
1351   * @return true if the job succeeded
1352   * @throws IOException thrown if the communication with the 
1353   *         <code>JobTracker</code> is lost
1354   */
1355  public boolean waitForCompletion(boolean verbose
1356                                   ) throws IOException, InterruptedException,
1357                                            ClassNotFoundException {
1358    if (state == JobState.DEFINE) {
1359      submit();
1360    }
1361    if (verbose) {
1362      monitorAndPrintJob();
1363    } else {
1364      // get the completion poll interval from the client.
1365      int completionPollIntervalMillis = 
1366        Job.getCompletionPollInterval(cluster.getConf());
1367      while (!isComplete()) {
1368        try {
1369          Thread.sleep(completionPollIntervalMillis);
1370        } catch (InterruptedException ie) {
1371        }
1372      }
1373    }
1374    return isSuccessful();
1375  }
1376  
1377  /**
1378   * Monitor a job and print status in real-time as progress is made and tasks 
1379   * fail.
1380   * @return true if the job succeeded
1381   * @throws IOException if communication to the JobTracker fails
1382   */
1383  public boolean monitorAndPrintJob() 
1384      throws IOException, InterruptedException {
1385    String lastReport = null;
1386    Job.TaskStatusFilter filter;
1387    Configuration clientConf = getConfiguration();
1388    filter = Job.getTaskOutputFilter(clientConf);
1389    JobID jobId = getJobID();
1390    LOG.info("Running job: " + jobId);
1391    int eventCounter = 0;
1392    boolean profiling = getProfileEnabled();
1393    IntegerRanges mapRanges = getProfileTaskRange(true);
1394    IntegerRanges reduceRanges = getProfileTaskRange(false);
1395    int progMonitorPollIntervalMillis = 
1396      Job.getProgressPollInterval(clientConf);
1397    /* make sure to report full progress after the job is done */
1398    boolean reportedAfterCompletion = false;
1399    boolean reportedUberMode = false;
1400    while (!isComplete() || !reportedAfterCompletion) {
1401      if (isComplete()) {
1402        reportedAfterCompletion = true;
1403      } else {
1404        Thread.sleep(progMonitorPollIntervalMillis);
1405      }
1406      if (status.getState() == JobStatus.State.PREP) {
1407        continue;
1408      }      
1409      if (!reportedUberMode) {
1410        reportedUberMode = true;
1411        LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1412      }      
1413      String report = 
1414        (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1415            " reduce " + 
1416            StringUtils.formatPercent(reduceProgress(), 0));
1417      if (!report.equals(lastReport)) {
1418        LOG.info(report);
1419        lastReport = report;
1420      }
1421
1422      TaskCompletionEvent[] events = 
1423        getTaskCompletionEvents(eventCounter, 10); 
1424      eventCounter += events.length;
1425      printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1426    }
1427    boolean success = isSuccessful();
1428    if (success) {
1429      LOG.info("Job " + jobId + " completed successfully");
1430    } else {
1431      LOG.info("Job " + jobId + " failed with state " + status.getState() + 
1432          " due to: " + status.getFailureInfo());
1433    }
1434    Counters counters = getCounters();
1435    if (counters != null) {
1436      LOG.info(counters.toString());
1437    }
1438    return success;
1439  }
1440
1441  private void printTaskEvents(TaskCompletionEvent[] events,
1442      Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1443      IntegerRanges reduceRanges) throws IOException, InterruptedException {
1444    for (TaskCompletionEvent event : events) {
1445      switch (filter) {
1446      case NONE:
1447        break;
1448      case SUCCEEDED:
1449        if (event.getStatus() == 
1450          TaskCompletionEvent.Status.SUCCEEDED) {
1451          LOG.info(event.toString());
1452        }
1453        break; 
1454      case FAILED:
1455        if (event.getStatus() == 
1456          TaskCompletionEvent.Status.FAILED) {
1457          LOG.info(event.toString());
1458          // Displaying the task diagnostic information
1459          TaskAttemptID taskId = event.getTaskAttemptId();
1460          String[] taskDiagnostics = getTaskDiagnostics(taskId); 
1461          if (taskDiagnostics != null) {
1462            for (String diagnostics : taskDiagnostics) {
1463              System.err.println(diagnostics);
1464            }
1465          }
1466        }
1467        break; 
1468      case KILLED:
1469        if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1470          LOG.info(event.toString());
1471        }
1472        break; 
1473      case ALL:
1474        LOG.info(event.toString());
1475        break;
1476      }
1477    }
1478  }
1479
1480  /** The interval at which monitorAndPrintJob() prints status */
1481  public static int getProgressPollInterval(Configuration conf) {
1482    // Read progress monitor poll interval from config. Default is 1 second.
1483    int progMonitorPollIntervalMillis = conf.getInt(
1484      PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1485    if (progMonitorPollIntervalMillis < 1) {
1486      LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
1487        " has been set to an invalid value; "
1488        + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1489      progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1490    }
1491    return progMonitorPollIntervalMillis;
1492  }
1493
1494  /** The interval at which waitForCompletion() should check. */
1495  public static int getCompletionPollInterval(Configuration conf) {
1496    int completionPollIntervalMillis = conf.getInt(
1497      COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1498    if (completionPollIntervalMillis < 1) { 
1499      LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
1500       " has been set to an invalid value; "
1501       + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1502      completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1503    }
1504    return completionPollIntervalMillis;
1505  }
1506
1507  /**
1508   * Get the task output filter.
1509   * 
1510   * @param conf the configuration.
1511   * @return the filter level.
1512   */
1513  public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1514    return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1515  }
1516
1517  /**
1518   * Modify the Configuration to set the task output filter.
1519   * 
1520   * @param conf the Configuration to modify.
1521   * @param newValue the value to set.
1522   */
1523  public static void setTaskOutputFilter(Configuration conf, 
1524      TaskStatusFilter newValue) {
1525    conf.set(Job.OUTPUT_FILTER, newValue.toString());
1526  }
1527
1528  public boolean isUber() throws IOException, InterruptedException {
1529    ensureState(JobState.RUNNING);
1530    updateStatus();
1531    return status.isUber();
1532  }
1533
1534  /**
1535   * Get the reservation to which the job is submitted to, if any
1536   *
1537   * @return the reservationId the identifier of the job's reservation, null if
1538   *         the job does not have any reservation associated with it
1539   */
1540  public ReservationId getReservationId() {
1541    return reservationId;
1542  }
1543
1544  /**
1545   * Set the reservation to which the job is submitted to
1546   *
1547   * @param reservationId the reservationId to set
1548   */
1549  public void setReservationId(ReservationId reservationId) {
1550    this.reservationId = reservationId;
1551  }
1552  
1553}