001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.IOException;
022import java.net.URI;
023import java.security.PrivilegedExceptionAction;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.classification.InterfaceAudience.Private;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.conf.Configuration.IntegerRanges;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.RawComparator;
035import org.apache.hadoop.mapred.JobConf;
036import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038import org.apache.hadoop.mapreduce.task.JobContextImpl;
039import org.apache.hadoop.mapreduce.util.ConfigUtil;
040import org.apache.hadoop.util.StringUtils;
041import org.apache.hadoop.yarn.api.records.ReservationId;
042
043/**
044 * The job submitter's view of the Job.
045 * 
046 * <p>It allows the user to configure the
047 * job, submit it, control its execution, and query the state. The set methods
048 * only work until the job is submitted, afterwards they will throw an 
049 * IllegalStateException. </p>
050 * 
051 * <p>
052 * Normally the user creates the application, describes various facets of the
053 * job via {@link Job} and then submits the job and monitor its progress.</p>
054 * 
055 * <p>Here is an example on how to submit a job:</p>
056 * <p><blockquote><pre>
057 *     // Create a new Job
058 *     Job job = Job.getInstance();
059 *     job.setJarByClass(MyJob.class);
060 *     
061 *     // Specify various job-specific parameters     
062 *     job.setJobName("myjob");
063 *     
064 *     job.setInputPath(new Path("in"));
065 *     job.setOutputPath(new Path("out"));
066 *     
067 *     job.setMapperClass(MyJob.MyMapper.class);
068 *     job.setReducerClass(MyJob.MyReducer.class);
069 *
070 *     // Submit the job, then poll for progress until the job is complete
071 *     job.waitForCompletion(true);
072 * </pre></blockquote></p>
073 * 
074 * 
075 */
076@InterfaceAudience.Public
077@InterfaceStability.Evolving
078public class Job extends JobContextImpl implements JobContext {  
079  private static final Log LOG = LogFactory.getLog(Job.class);
080
081  @InterfaceStability.Evolving
082  public static enum JobState {DEFINE, RUNNING};
083  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
084  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
085  /** Key in mapred-*.xml that sets completionPollInvervalMillis */
086  public static final String COMPLETION_POLL_INTERVAL_KEY = 
087    "mapreduce.client.completion.pollinterval";
088  
089  /** Default completionPollIntervalMillis is 5000 ms. */
090  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
091  /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
092  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
093    "mapreduce.client.progressmonitor.pollinterval";
094  /** Default progMonitorPollIntervalMillis is 1000 ms. */
095  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
096
097  public static final String USED_GENERIC_PARSER = 
098    "mapreduce.client.genericoptionsparser.used";
099  public static final String SUBMIT_REPLICATION = 
100    "mapreduce.client.submit.file.replication";
101  private static final String TASKLOG_PULL_TIMEOUT_KEY =
102           "mapreduce.client.tasklog.timeout";
103  private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
104  public static final int DEFAULT_SUBMIT_REPLICATION = 10;
105
106  @InterfaceStability.Evolving
107  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
108
109  static {
110    ConfigUtil.loadResources();
111  }
112
113  private JobState state = JobState.DEFINE;
114  private JobStatus status;
115  private long statustime;
116  private Cluster cluster;
117  private ReservationId reservationId;
118
119  /**
120   * @deprecated Use {@link #getInstance()}
121   */
122  @Deprecated
123  public Job() throws IOException {
124    this(new Configuration());
125  }
126
127  /**
128   * @deprecated Use {@link #getInstance(Configuration)}
129   */
130  @Deprecated
131  public Job(Configuration conf) throws IOException {
132    this(new JobConf(conf));
133  }
134
135  /**
136   * @deprecated Use {@link #getInstance(Configuration, String)}
137   */
138  @Deprecated
139  public Job(Configuration conf, String jobName) throws IOException {
140    this(conf);
141    setJobName(jobName);
142  }
143
144  Job(JobConf conf) throws IOException {
145    super(conf, null);
146    // propagate existing user credentials to job
147    this.credentials.mergeAll(this.ugi.getCredentials());
148    this.cluster = null;
149  }
150
151  Job(JobStatus status, JobConf conf) throws IOException {
152    this(conf);
153    setJobID(status.getJobID());
154    this.status = status;
155    state = JobState.RUNNING;
156  }
157
158      
159  /**
160   * Creates a new {@link Job} with no particular {@link Cluster} .
161   * A Cluster will be created with a generic {@link Configuration}.
162   * 
163   * @return the {@link Job} , with no connection to a cluster yet.
164   * @throws IOException
165   */
166  public static Job getInstance() throws IOException {
167    // create with a null Cluster
168    return getInstance(new Configuration());
169  }
170      
171  /**
172   * Creates a new {@link Job} with no particular {@link Cluster} and a 
173   * given {@link Configuration}.
174   * 
175   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
176   * that any necessary internal modifications do not reflect on the incoming 
177   * parameter.
178   * 
179   * A Cluster will be created from the conf parameter only when it's needed.
180   * 
181   * @param conf the configuration
182   * @return the {@link Job} , with no connection to a cluster yet.
183   * @throws IOException
184   */
185  public static Job getInstance(Configuration conf) throws IOException {
186    // create with a null Cluster
187    JobConf jobConf = new JobConf(conf);
188    return new Job(jobConf);
189  }
190
191      
192  /**
193   * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
194   * A Cluster will be created from the conf parameter only when it's needed.
195   *
196   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
197   * that any necessary internal modifications do not reflect on the incoming 
198   * parameter.
199   * 
200   * @param conf the configuration
201   * @return the {@link Job} , with no connection to a cluster yet.
202   * @throws IOException
203   */
204  public static Job getInstance(Configuration conf, String jobName)
205           throws IOException {
206    // create with a null Cluster
207    Job result = getInstance(conf);
208    result.setJobName(jobName);
209    return result;
210  }
211  
212  /**
213   * Creates a new {@link Job} with no particular {@link Cluster} and given
214   * {@link Configuration} and {@link JobStatus}.
215   * A Cluster will be created from the conf parameter only when it's needed.
216   * 
217   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
218   * that any necessary internal modifications do not reflect on the incoming 
219   * parameter.
220   * 
221   * @param status job status
222   * @param conf job configuration
223   * @return the {@link Job} , with no connection to a cluster yet.
224   * @throws IOException
225   */
226  public static Job getInstance(JobStatus status, Configuration conf) 
227  throws IOException {
228    return new Job(status, new JobConf(conf));
229  }
230
231  /**
232   * Creates a new {@link Job} with no particular {@link Cluster}.
233   * A Cluster will be created from the conf parameter only when it's needed.
234   *
235   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
236   * that any necessary internal modifications do not reflect on the incoming 
237   * parameter.
238   * 
239   * @param ignored
240   * @return the {@link Job} , with no connection to a cluster yet.
241   * @throws IOException
242   * @deprecated Use {@link #getInstance()}
243   */
244  @Deprecated
245  public static Job getInstance(Cluster ignored) throws IOException {
246    return getInstance();
247  }
248  
249  /**
250   * Creates a new {@link Job} with no particular {@link Cluster} and given
251   * {@link Configuration}.
252   * A Cluster will be created from the conf parameter only when it's needed.
253   * 
254   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
255   * that any necessary internal modifications do not reflect on the incoming 
256   * parameter.
257   * 
258   * @param ignored
259   * @param conf job configuration
260   * @return the {@link Job} , with no connection to a cluster yet.
261   * @throws IOException
262   * @deprecated Use {@link #getInstance(Configuration)}
263   */
264  @Deprecated
265  public static Job getInstance(Cluster ignored, Configuration conf) 
266      throws IOException {
267    return getInstance(conf);
268  }
269  
270  /**
271   * Creates a new {@link Job} with no particular {@link Cluster} and given
272   * {@link Configuration} and {@link JobStatus}.
273   * A Cluster will be created from the conf parameter only when it's needed.
274   * 
275   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
276   * that any necessary internal modifications do not reflect on the incoming 
277   * parameter.
278   * 
279   * @param cluster cluster
280   * @param status job status
281   * @param conf job configuration
282   * @return the {@link Job} , with no connection to a cluster yet.
283   * @throws IOException
284   */
285  @Private
286  public static Job getInstance(Cluster cluster, JobStatus status, 
287      Configuration conf) throws IOException {
288    Job job = getInstance(status, conf);
289    job.setCluster(cluster);
290    return job;
291  }
292
293  private void ensureState(JobState state) throws IllegalStateException {
294    if (state != this.state) {
295      throw new IllegalStateException("Job in state "+ this.state + 
296                                      " instead of " + state);
297    }
298
299    if (state == JobState.RUNNING && cluster == null) {
300      throw new IllegalStateException
301        ("Job in state " + this.state
302         + ", but it isn't attached to any job tracker!");
303    }
304  }
305
306  /**
307   * Some methods rely on having a recent job status object.  Refresh
308   * it, if necessary
309   */
310  synchronized void ensureFreshStatus() 
311      throws IOException {
312    if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
313      updateStatus();
314    }
315  }
316    
317  /** Some methods need to update status immediately. So, refresh
318   * immediately
319   * @throws IOException
320   */
321  synchronized void updateStatus() throws IOException {
322    try {
323      this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
324        @Override
325        public JobStatus run() throws IOException, InterruptedException {
326          return cluster.getClient().getJobStatus(status.getJobID());
327        }
328      });
329    }
330    catch (InterruptedException ie) {
331      throw new IOException(ie);
332    }
333    if (this.status == null) {
334      throw new IOException("Job status not available ");
335    }
336    this.statustime = System.currentTimeMillis();
337  }
338  
339  public JobStatus getStatus() throws IOException, InterruptedException {
340    ensureState(JobState.RUNNING);
341    updateStatus();
342    return status;
343  }
344  
345  private void setStatus(JobStatus status) {
346    this.status = status;
347  }
348
349  /**
350   * Returns the current state of the Job.
351   * 
352   * @return JobStatus#State
353   * @throws IOException
354   * @throws InterruptedException
355   */
356  public JobStatus.State getJobState() 
357      throws IOException, InterruptedException {
358    ensureState(JobState.RUNNING);
359    updateStatus();
360    return status.getState();
361  }
362  
363  /**
364   * Get the URL where some job progress information will be displayed.
365   * 
366   * @return the URL where some job progress information will be displayed.
367   */
368  public String getTrackingURL(){
369    ensureState(JobState.RUNNING);
370    return status.getTrackingUrl().toString();
371  }
372
373  /**
374   * Get the path of the submitted job configuration.
375   * 
376   * @return the path of the submitted job configuration.
377   */
378  public String getJobFile() {
379    ensureState(JobState.RUNNING);
380    return status.getJobFile();
381  }
382
383  /**
384   * Get start time of the job.
385   * 
386   * @return the start time of the job
387   */
388  public long getStartTime() {
389    ensureState(JobState.RUNNING);
390    return status.getStartTime();
391  }
392
393  /**
394   * Get finish time of the job.
395   * 
396   * @return the finish time of the job
397   */
398  public long getFinishTime() throws IOException, InterruptedException {
399    ensureState(JobState.RUNNING);
400    updateStatus();
401    return status.getFinishTime();
402  }
403
404  /**
405   * Get scheduling info of the job.
406   * 
407   * @return the scheduling info of the job
408   */
409  public String getSchedulingInfo() {
410    ensureState(JobState.RUNNING);
411    return status.getSchedulingInfo();
412  }
413
414  /**
415   * Get scheduling info of the job.
416   * 
417   * @return the scheduling info of the job
418   */
419  public JobPriority getPriority() throws IOException, InterruptedException {
420    ensureState(JobState.RUNNING);
421    updateStatus();
422    return status.getPriority();
423  }
424
425  /**
426   * The user-specified job name.
427   */
428  public String getJobName() {
429    if (state == JobState.DEFINE) {
430      return super.getJobName();
431    }
432    ensureState(JobState.RUNNING);
433    return status.getJobName();
434  }
435
436  public String getHistoryUrl() throws IOException, InterruptedException {
437    ensureState(JobState.RUNNING);
438    updateStatus();
439    return status.getHistoryFile();
440  }
441
442  public boolean isRetired() throws IOException, InterruptedException {
443    ensureState(JobState.RUNNING);
444    updateStatus();
445    return status.isRetired();
446  }
447  
448  @Private
449  public Cluster getCluster() {
450    return cluster;
451  }
452
453  /** Only for mocks in unit tests. */
454  @Private
455  private void setCluster(Cluster cluster) {
456    this.cluster = cluster;
457  }
458
459  /**
460   * Dump stats to screen.
461   */
462  @Override
463  public String toString() {
464    ensureState(JobState.RUNNING);
465    String reasonforFailure = " ";
466    int numMaps = 0;
467    int numReduces = 0;
468    try {
469      updateStatus();
470      if (status.getState().equals(JobStatus.State.FAILED))
471        reasonforFailure = getTaskFailureEventString();
472      numMaps = getTaskReports(TaskType.MAP).length;
473      numReduces = getTaskReports(TaskType.REDUCE).length;
474    } catch (IOException e) {
475    } catch (InterruptedException ie) {
476    }
477    StringBuffer sb = new StringBuffer();
478    sb.append("Job: ").append(status.getJobID()).append("\n");
479    sb.append("Job File: ").append(status.getJobFile()).append("\n");
480    sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
481    sb.append("\n");
482    sb.append("Uber job : ").append(status.isUber()).append("\n");
483    sb.append("Number of maps: ").append(numMaps).append("\n");
484    sb.append("Number of reduces: ").append(numReduces).append("\n");
485    sb.append("map() completion: ");
486    sb.append(status.getMapProgress()).append("\n");
487    sb.append("reduce() completion: ");
488    sb.append(status.getReduceProgress()).append("\n");
489    sb.append("Job state: ");
490    sb.append(status.getState()).append("\n");
491    sb.append("retired: ").append(status.isRetired()).append("\n");
492    sb.append("reason for failure: ").append(reasonforFailure);
493    return sb.toString();
494  }
495
496  /**
497   * @return taskid which caused job failure
498   * @throws IOException
499   * @throws InterruptedException
500   */
501  String getTaskFailureEventString() throws IOException,
502      InterruptedException {
503    int failCount = 1;
504    TaskCompletionEvent lastEvent = null;
505    TaskCompletionEvent[] events = ugi.doAs(new 
506        PrivilegedExceptionAction<TaskCompletionEvent[]>() {
507          @Override
508          public TaskCompletionEvent[] run() throws IOException,
509          InterruptedException {
510            return cluster.getClient().getTaskCompletionEvents(
511                status.getJobID(), 0, 10);
512          }
513        });
514    for (TaskCompletionEvent event : events) {
515      if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
516        failCount++;
517        lastEvent = event;
518      }
519    }
520    if (lastEvent == null) {
521      return "There are no failed tasks for the job. "
522          + "Job is failed due to some other reason and reason "
523          + "can be found in the logs.";
524    }
525    String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
526    String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
527    return (" task " + taskID + " failed " +
528      failCount + " times " + "For details check tasktracker at: " +
529      lastEvent.getTaskTrackerHttp());
530  }
531
532  /**
533   * Get the information of the current state of the tasks of a job.
534   * 
535   * @param type Type of the task
536   * @return the list of all of the map tips.
537   * @throws IOException
538   */
539  public TaskReport[] getTaskReports(TaskType type) 
540      throws IOException, InterruptedException {
541    ensureState(JobState.RUNNING);
542    final TaskType tmpType = type;
543    return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
544      public TaskReport[] run() throws IOException, InterruptedException {
545        return cluster.getClient().getTaskReports(getJobID(), tmpType);
546      }
547    });
548  }
549
550  /**
551   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
552   * and 1.0.  When all map tasks have completed, the function returns 1.0.
553   * 
554   * @return the progress of the job's map-tasks.
555   * @throws IOException
556   */
557  public float mapProgress() throws IOException {
558    ensureState(JobState.RUNNING);
559    ensureFreshStatus();
560    return status.getMapProgress();
561  }
562
563  /**
564   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
565   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
566   * 
567   * @return the progress of the job's reduce-tasks.
568   * @throws IOException
569   */
570  public float reduceProgress() throws IOException {
571    ensureState(JobState.RUNNING);
572    ensureFreshStatus();
573    return status.getReduceProgress();
574  }
575
576  /**
577   * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
578   * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
579   * 
580   * @return the progress of the job's cleanup-tasks.
581   * @throws IOException
582   */
583  public float cleanupProgress() throws IOException, InterruptedException {
584    ensureState(JobState.RUNNING);
585    ensureFreshStatus();
586    return status.getCleanupProgress();
587  }
588
589  /**
590   * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
591   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
592   * 
593   * @return the progress of the job's setup-tasks.
594   * @throws IOException
595   */
596  public float setupProgress() throws IOException {
597    ensureState(JobState.RUNNING);
598    ensureFreshStatus();
599    return status.getSetupProgress();
600  }
601
602  /**
603   * Check if the job is finished or not. 
604   * This is a non-blocking call.
605   * 
606   * @return <code>true</code> if the job is complete, else <code>false</code>.
607   * @throws IOException
608   */
609  public boolean isComplete() throws IOException {
610    ensureState(JobState.RUNNING);
611    updateStatus();
612    return status.isJobComplete();
613  }
614
615  /**
616   * Check if the job completed successfully. 
617   * 
618   * @return <code>true</code> if the job succeeded, else <code>false</code>.
619   * @throws IOException
620   */
621  public boolean isSuccessful() throws IOException {
622    ensureState(JobState.RUNNING);
623    updateStatus();
624    return status.getState() == JobStatus.State.SUCCEEDED;
625  }
626
627  /**
628   * Kill the running job.  Blocks until all job tasks have been
629   * killed as well.  If the job is no longer running, it simply returns.
630   * 
631   * @throws IOException
632   */
633  public void killJob() throws IOException {
634    ensureState(JobState.RUNNING);
635    try {
636      cluster.getClient().killJob(getJobID());
637    }
638    catch (InterruptedException ie) {
639      throw new IOException(ie);
640    }
641  }
642
643  /**
644   * Set the priority of a running job.
645   * @param priority the new priority for the job.
646   * @throws IOException
647   */
648  public void setPriority(JobPriority priority) 
649      throws IOException, InterruptedException {
650    if (state == JobState.DEFINE) {
651      conf.setJobPriority(
652        org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
653    } else {
654      ensureState(JobState.RUNNING);
655      final JobPriority tmpPriority = priority;
656      ugi.doAs(new PrivilegedExceptionAction<Object>() {
657        @Override
658        public Object run() throws IOException, InterruptedException {
659          cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
660          return null;
661        }
662      });
663    }
664  }
665
666  /**
667   * Get events indicating completion (success/failure) of component tasks.
668   *  
669   * @param startFrom index to start fetching events from
670   * @param numEvents number of events to fetch
671   * @return an array of {@link TaskCompletionEvent}s
672   * @throws IOException
673   */
674  public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
675      final int numEvents) throws IOException, InterruptedException {
676    ensureState(JobState.RUNNING);
677    return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
678      @Override
679      public TaskCompletionEvent[] run() throws IOException, InterruptedException {
680        return cluster.getClient().getTaskCompletionEvents(getJobID(),
681            startFrom, numEvents); 
682      }
683    });
684  }
685
686  /**
687   * Get events indicating completion (success/failure) of component tasks.
688   *  
689   * @param startFrom index to start fetching events from
690   * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s
691   * @throws IOException
692   */
693  public org.apache.hadoop.mapred.TaskCompletionEvent[]
694    getTaskCompletionEvents(final int startFrom) throws IOException {
695    try {
696      TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10);
697      org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents =
698          new org.apache.hadoop.mapred.TaskCompletionEvent[events.length];
699      for (int i = 0; i < events.length; i++) {
700        retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade
701            (events[i]);
702      }
703      return retEvents;
704    } catch (InterruptedException ie) {
705      throw new IOException(ie);
706    }
707  }
708
709  /**
710   * Kill indicated task attempt.
711   * @param taskId the id of the task to kill.
712   * @param shouldFail if <code>true</code> the task is failed and added
713   *                   to failed tasks list, otherwise it is just killed,
714   *                   w/o affecting job failure status.
715   */
716  @Private
717  public boolean killTask(final TaskAttemptID taskId,
718                          final boolean shouldFail) throws IOException {
719    ensureState(JobState.RUNNING);
720    try {
721      return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
722        public Boolean run() throws IOException, InterruptedException {
723          return cluster.getClient().killTask(taskId, shouldFail);
724        }
725      });
726    }
727    catch (InterruptedException ie) {
728      throw new IOException(ie);
729    }
730  }
731
732  /**
733   * Kill indicated task attempt.
734   * 
735   * @param taskId the id of the task to be terminated.
736   * @throws IOException
737   */
738  public void killTask(final TaskAttemptID taskId)
739      throws IOException {
740    killTask(taskId, false);
741  }
742
743  /**
744   * Fail indicated task attempt.
745   * 
746   * @param taskId the id of the task to be terminated.
747   * @throws IOException
748   */
749  public void failTask(final TaskAttemptID taskId)
750      throws IOException {
751    killTask(taskId, true);
752  }
753
754  /**
755   * Gets the counters for this job. May return null if the job has been
756   * retired and the job is no longer in the completed job store.
757   * 
758   * @return the counters for this job.
759   * @throws IOException
760   */
761  public Counters getCounters() 
762      throws IOException {
763    ensureState(JobState.RUNNING);
764    try {
765      return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
766        @Override
767        public Counters run() throws IOException, InterruptedException {
768          return cluster.getClient().getJobCounters(getJobID());
769        }
770      });
771    }
772    catch (InterruptedException ie) {
773      throw new IOException(ie);
774    }
775  }
776
777  /**
778   * Gets the diagnostic messages for a given task attempt.
779   * @param taskid
780   * @return the list of diagnostic messages for the task
781   * @throws IOException
782   */
783  public String[] getTaskDiagnostics(final TaskAttemptID taskid) 
784      throws IOException, InterruptedException {
785    ensureState(JobState.RUNNING);
786    return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
787      @Override
788      public String[] run() throws IOException, InterruptedException {
789        return cluster.getClient().getTaskDiagnostics(taskid);
790      }
791    });
792  }
793
794  /**
795   * Set the number of reduce tasks for the job.
796   * @param tasks the number of reduce tasks
797   * @throws IllegalStateException if the job is submitted
798   */
799  public void setNumReduceTasks(int tasks) throws IllegalStateException {
800    ensureState(JobState.DEFINE);
801    conf.setNumReduceTasks(tasks);
802  }
803
804  /**
805   * Set the current working directory for the default file system.
806   * 
807   * @param dir the new current working directory.
808   * @throws IllegalStateException if the job is submitted
809   */
810  public void setWorkingDirectory(Path dir) throws IOException {
811    ensureState(JobState.DEFINE);
812    conf.setWorkingDirectory(dir);
813  }
814
815  /**
816   * Set the {@link InputFormat} for the job.
817   * @param cls the <code>InputFormat</code> to use
818   * @throws IllegalStateException if the job is submitted
819   */
820  public void setInputFormatClass(Class<? extends InputFormat> cls
821                                  ) throws IllegalStateException {
822    ensureState(JobState.DEFINE);
823    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
824                  InputFormat.class);
825  }
826
827  /**
828   * Set the {@link OutputFormat} for the job.
829   * @param cls the <code>OutputFormat</code> to use
830   * @throws IllegalStateException if the job is submitted
831   */
832  public void setOutputFormatClass(Class<? extends OutputFormat> cls
833                                   ) throws IllegalStateException {
834    ensureState(JobState.DEFINE);
835    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
836                  OutputFormat.class);
837  }
838
839  /**
840   * Set the {@link Mapper} for the job.
841   * @param cls the <code>Mapper</code> to use
842   * @throws IllegalStateException if the job is submitted
843   */
844  public void setMapperClass(Class<? extends Mapper> cls
845                             ) throws IllegalStateException {
846    ensureState(JobState.DEFINE);
847    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
848  }
849
850  /**
851   * Set the Jar by finding where a given class came from.
852   * @param cls the example class
853   */
854  public void setJarByClass(Class<?> cls) {
855    ensureState(JobState.DEFINE);
856    conf.setJarByClass(cls);
857  }
858
859  /**
860   * Set the job jar 
861   */
862  public void setJar(String jar) {
863    ensureState(JobState.DEFINE);
864    conf.setJar(jar);
865  }
866
867  /**
868   * Set the reported username for this job.
869   * 
870   * @param user the username for this job.
871   */
872  public void setUser(String user) {
873    ensureState(JobState.DEFINE);
874    conf.setUser(user);
875  }
876
877  /**
878   * Set the combiner class for the job.
879   * @param cls the combiner to use
880   * @throws IllegalStateException if the job is submitted
881   */
882  public void setCombinerClass(Class<? extends Reducer> cls
883                               ) throws IllegalStateException {
884    ensureState(JobState.DEFINE);
885    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
886  }
887
888  /**
889   * Set the {@link Reducer} for the job.
890   * @param cls the <code>Reducer</code> to use
891   * @throws IllegalStateException if the job is submitted
892   */
893  public void setReducerClass(Class<? extends Reducer> cls
894                              ) throws IllegalStateException {
895    ensureState(JobState.DEFINE);
896    conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
897  }
898
899  /**
900   * Set the {@link Partitioner} for the job.
901   * @param cls the <code>Partitioner</code> to use
902   * @throws IllegalStateException if the job is submitted
903   */
904  public void setPartitionerClass(Class<? extends Partitioner> cls
905                                  ) throws IllegalStateException {
906    ensureState(JobState.DEFINE);
907    conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
908                  Partitioner.class);
909  }
910
911  /**
912   * Set the key class for the map output data. This allows the user to
913   * specify the map output key class to be different than the final output
914   * value class.
915   * 
916   * @param theClass the map output key class.
917   * @throws IllegalStateException if the job is submitted
918   */
919  public void setMapOutputKeyClass(Class<?> theClass
920                                   ) throws IllegalStateException {
921    ensureState(JobState.DEFINE);
922    conf.setMapOutputKeyClass(theClass);
923  }
924
925  /**
926   * Set the value class for the map output data. This allows the user to
927   * specify the map output value class to be different than the final output
928   * value class.
929   * 
930   * @param theClass the map output value class.
931   * @throws IllegalStateException if the job is submitted
932   */
933  public void setMapOutputValueClass(Class<?> theClass
934                                     ) throws IllegalStateException {
935    ensureState(JobState.DEFINE);
936    conf.setMapOutputValueClass(theClass);
937  }
938
939  /**
940   * Set the key class for the job output data.
941   * 
942   * @param theClass the key class for the job output data.
943   * @throws IllegalStateException if the job is submitted
944   */
945  public void setOutputKeyClass(Class<?> theClass
946                                ) throws IllegalStateException {
947    ensureState(JobState.DEFINE);
948    conf.setOutputKeyClass(theClass);
949  }
950
951  /**
952   * Set the value class for job outputs.
953   * 
954   * @param theClass the value class for job outputs.
955   * @throws IllegalStateException if the job is submitted
956   */
957  public void setOutputValueClass(Class<?> theClass
958                                  ) throws IllegalStateException {
959    ensureState(JobState.DEFINE);
960    conf.setOutputValueClass(theClass);
961  }
962
963  /**
964   * Define the comparator that controls which keys are grouped together
965   * for a single call to combiner,
966   * {@link Reducer#reduce(Object, Iterable,
967   * org.apache.hadoop.mapreduce.Reducer.Context)}
968   *
969   * @param cls the raw comparator to use
970   * @throws IllegalStateException if the job is submitted
971   */
972  public void setCombinerKeyGroupingComparatorClass(
973      Class<? extends RawComparator> cls) throws IllegalStateException {
974    ensureState(JobState.DEFINE);
975    conf.setCombinerKeyGroupingComparator(cls);
976  }
977
978  /**
979   * Define the comparator that controls how the keys are sorted before they
980   * are passed to the {@link Reducer}.
981   * @param cls the raw comparator
982   * @throws IllegalStateException if the job is submitted
983   * @see #setCombinerKeyGroupingComparatorClass(Class)
984   */
985  public void setSortComparatorClass(Class<? extends RawComparator> cls
986                                     ) throws IllegalStateException {
987    ensureState(JobState.DEFINE);
988    conf.setOutputKeyComparatorClass(cls);
989  }
990
991  /**
992   * Define the comparator that controls which keys are grouped together
993   * for a single call to 
994   * {@link Reducer#reduce(Object, Iterable, 
995   *                       org.apache.hadoop.mapreduce.Reducer.Context)}
996   * @param cls the raw comparator to use
997   * @throws IllegalStateException if the job is submitted
998   * @see #setCombinerKeyGroupingComparatorClass(Class)
999   */
1000  public void setGroupingComparatorClass(Class<? extends RawComparator> cls
1001                                         ) throws IllegalStateException {
1002    ensureState(JobState.DEFINE);
1003    conf.setOutputValueGroupingComparator(cls);
1004  }
1005
1006  /**
1007   * Set the user-specified job name.
1008   * 
1009   * @param name the job's new name.
1010   * @throws IllegalStateException if the job is submitted
1011   */
1012  public void setJobName(String name) throws IllegalStateException {
1013    ensureState(JobState.DEFINE);
1014    conf.setJobName(name);
1015  }
1016
1017  /**
1018   * Turn speculative execution on or off for this job. 
1019   * 
1020   * @param speculativeExecution <code>true</code> if speculative execution 
1021   *                             should be turned on, else <code>false</code>.
1022   */
1023  public void setSpeculativeExecution(boolean speculativeExecution) {
1024    ensureState(JobState.DEFINE);
1025    conf.setSpeculativeExecution(speculativeExecution);
1026  }
1027
1028  /**
1029   * Turn speculative execution on or off for this job for map tasks. 
1030   * 
1031   * @param speculativeExecution <code>true</code> if speculative execution 
1032   *                             should be turned on for map tasks,
1033   *                             else <code>false</code>.
1034   */
1035  public void setMapSpeculativeExecution(boolean speculativeExecution) {
1036    ensureState(JobState.DEFINE);
1037    conf.setMapSpeculativeExecution(speculativeExecution);
1038  }
1039
1040  /**
1041   * Turn speculative execution on or off for this job for reduce tasks. 
1042   * 
1043   * @param speculativeExecution <code>true</code> if speculative execution 
1044   *                             should be turned on for reduce tasks,
1045   *                             else <code>false</code>.
1046   */
1047  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1048    ensureState(JobState.DEFINE);
1049    conf.setReduceSpeculativeExecution(speculativeExecution);
1050  }
1051
1052  /**
1053   * Specify whether job-setup and job-cleanup is needed for the job 
1054   * 
1055   * @param needed If <code>true</code>, job-setup and job-cleanup will be
1056   *               considered from {@link OutputCommitter} 
1057   *               else ignored.
1058   */
1059  public void setJobSetupCleanupNeeded(boolean needed) {
1060    ensureState(JobState.DEFINE);
1061    conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
1062  }
1063
1064  /**
1065   * Set the given set of archives
1066   * @param archives The list of archives that need to be localized
1067   */
1068  public void setCacheArchives(URI[] archives) {
1069    ensureState(JobState.DEFINE);
1070    DistributedCache.setCacheArchives(archives, conf);
1071  }
1072
1073  /**
1074   * Set the given set of files
1075   * @param files The list of files that need to be localized
1076   */
1077  public void setCacheFiles(URI[] files) {
1078    ensureState(JobState.DEFINE);
1079    DistributedCache.setCacheFiles(files, conf);
1080  }
1081
1082  /**
1083   * Add a archives to be localized
1084   * @param uri The uri of the cache to be localized
1085   */
1086  public void addCacheArchive(URI uri) {
1087    ensureState(JobState.DEFINE);
1088    DistributedCache.addCacheArchive(uri, conf);
1089  }
1090  
1091  /**
1092   * Add a file to be localized
1093   * @param uri The uri of the cache to be localized
1094   */
1095  public void addCacheFile(URI uri) {
1096    ensureState(JobState.DEFINE);
1097    DistributedCache.addCacheFile(uri, conf);
1098  }
1099
1100  /**
1101   * Add an file path to the current set of classpath entries It adds the file
1102   * to cache as well.
1103   * 
1104   * Files added with this method will not be unpacked while being added to the
1105   * classpath.
1106   * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1107   * method instead.
1108   *
1109   * @param file Path of the file to be added
1110   */
1111  public void addFileToClassPath(Path file)
1112    throws IOException {
1113    ensureState(JobState.DEFINE);
1114    DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1115  }
1116
1117  /**
1118   * Add an archive path to the current set of classpath entries. It adds the
1119   * archive to cache as well.
1120   * 
1121   * Archive files will be unpacked and added to the classpath
1122   * when being distributed.
1123   *
1124   * @param archive Path of the archive to be added
1125   */
1126  public void addArchiveToClassPath(Path archive)
1127    throws IOException {
1128    ensureState(JobState.DEFINE);
1129    DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1130  }
1131
1132  /**
1133   * Originally intended to enable symlinks, but currently symlinks cannot be
1134   * disabled.
1135   */
1136  @Deprecated
1137  public void createSymlink() {
1138    ensureState(JobState.DEFINE);
1139    DistributedCache.createSymlink(conf);
1140  }
1141  
1142  /** 
1143   * Expert: Set the number of maximum attempts that will be made to run a
1144   * map task.
1145   * 
1146   * @param n the number of attempts per map task.
1147   */
1148  public void setMaxMapAttempts(int n) {
1149    ensureState(JobState.DEFINE);
1150    conf.setMaxMapAttempts(n);
1151  }
1152
1153  /** 
1154   * Expert: Set the number of maximum attempts that will be made to run a
1155   * reduce task.
1156   * 
1157   * @param n the number of attempts per reduce task.
1158   */
1159  public void setMaxReduceAttempts(int n) {
1160    ensureState(JobState.DEFINE);
1161    conf.setMaxReduceAttempts(n);
1162  }
1163
1164  /**
1165   * Set whether the system should collect profiler information for some of 
1166   * the tasks in this job? The information is stored in the user log 
1167   * directory.
1168   * @param newValue true means it should be gathered
1169   */
1170  public void setProfileEnabled(boolean newValue) {
1171    ensureState(JobState.DEFINE);
1172    conf.setProfileEnabled(newValue);
1173  }
1174
1175  /**
1176   * Set the profiler configuration arguments. If the string contains a '%s' it
1177   * will be replaced with the name of the profiling output file when the task
1178   * runs.
1179   *
1180   * This value is passed to the task child JVM on the command line.
1181   *
1182   * @param value the configuration string
1183   */
1184  public void setProfileParams(String value) {
1185    ensureState(JobState.DEFINE);
1186    conf.setProfileParams(value);
1187  }
1188
1189  /**
1190   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1191   * must also be called.
1192   * @param newValue a set of integer ranges of the map ids
1193   */
1194  public void setProfileTaskRange(boolean isMap, String newValue) {
1195    ensureState(JobState.DEFINE);
1196    conf.setProfileTaskRange(isMap, newValue);
1197  }
1198
1199  private void ensureNotSet(String attr, String msg) throws IOException {
1200    if (conf.get(attr) != null) {
1201      throw new IOException(attr + " is incompatible with " + msg + " mode.");
1202    }    
1203  }
1204  
1205  /**
1206   * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1207   * tokens upon job completion. Defaults to true.
1208   */
1209  public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1210    ensureState(JobState.DEFINE);
1211    conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1212  }
1213
1214  /**
1215   * Default to the new APIs unless they are explicitly set or the old mapper or
1216   * reduce attributes are used.
1217   * @throws IOException if the configuration is inconsistant
1218   */
1219  private void setUseNewAPI() throws IOException {
1220    int numReduces = conf.getNumReduceTasks();
1221    String oldMapperClass = "mapred.mapper.class";
1222    String oldReduceClass = "mapred.reducer.class";
1223    conf.setBooleanIfUnset("mapred.mapper.new-api",
1224                           conf.get(oldMapperClass) == null);
1225    if (conf.getUseNewMapper()) {
1226      String mode = "new map API";
1227      ensureNotSet("mapred.input.format.class", mode);
1228      ensureNotSet(oldMapperClass, mode);
1229      if (numReduces != 0) {
1230        ensureNotSet("mapred.partitioner.class", mode);
1231       } else {
1232        ensureNotSet("mapred.output.format.class", mode);
1233      }      
1234    } else {
1235      String mode = "map compatability";
1236      ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1237      ensureNotSet(MAP_CLASS_ATTR, mode);
1238      if (numReduces != 0) {
1239        ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1240       } else {
1241        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1242      }
1243    }
1244    if (numReduces != 0) {
1245      conf.setBooleanIfUnset("mapred.reducer.new-api",
1246                             conf.get(oldReduceClass) == null);
1247      if (conf.getUseNewReducer()) {
1248        String mode = "new reduce API";
1249        ensureNotSet("mapred.output.format.class", mode);
1250        ensureNotSet(oldReduceClass, mode);   
1251      } else {
1252        String mode = "reduce compatability";
1253        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1254        ensureNotSet(REDUCE_CLASS_ATTR, mode);   
1255      }
1256    }   
1257  }
1258
1259  private synchronized void connect()
1260          throws IOException, InterruptedException, ClassNotFoundException {
1261    if (cluster == null) {
1262      cluster = 
1263        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1264                   public Cluster run()
1265                          throws IOException, InterruptedException, 
1266                                 ClassNotFoundException {
1267                     return new Cluster(getConfiguration());
1268                   }
1269                 });
1270    }
1271  }
1272
1273  boolean isConnected() {
1274    return cluster != null;
1275  }
1276
1277  /** Only for mocking via unit tests. */
1278  @Private
1279  public JobSubmitter getJobSubmitter(FileSystem fs, 
1280      ClientProtocol submitClient) throws IOException {
1281    return new JobSubmitter(fs, submitClient);
1282  }
1283  /**
1284   * Submit the job to the cluster and return immediately.
1285   * @throws IOException
1286   */
1287  public void submit() 
1288         throws IOException, InterruptedException, ClassNotFoundException {
1289    ensureState(JobState.DEFINE);
1290    setUseNewAPI();
1291    connect();
1292    final JobSubmitter submitter = 
1293        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1294    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1295      public JobStatus run() throws IOException, InterruptedException, 
1296      ClassNotFoundException {
1297        return submitter.submitJobInternal(Job.this, cluster);
1298      }
1299    });
1300    state = JobState.RUNNING;
1301    LOG.info("The url to track the job: " + getTrackingURL());
1302   }
1303  
1304  /**
1305   * Submit the job to the cluster and wait for it to finish.
1306   * @param verbose print the progress to the user
1307   * @return true if the job succeeded
1308   * @throws IOException thrown if the communication with the 
1309   *         <code>JobTracker</code> is lost
1310   */
1311  public boolean waitForCompletion(boolean verbose
1312                                   ) throws IOException, InterruptedException,
1313                                            ClassNotFoundException {
1314    if (state == JobState.DEFINE) {
1315      submit();
1316    }
1317    if (verbose) {
1318      monitorAndPrintJob();
1319    } else {
1320      // get the completion poll interval from the client.
1321      int completionPollIntervalMillis = 
1322        Job.getCompletionPollInterval(cluster.getConf());
1323      while (!isComplete()) {
1324        try {
1325          Thread.sleep(completionPollIntervalMillis);
1326        } catch (InterruptedException ie) {
1327        }
1328      }
1329    }
1330    return isSuccessful();
1331  }
1332  
1333  /**
1334   * Monitor a job and print status in real-time as progress is made and tasks 
1335   * fail.
1336   * @return true if the job succeeded
1337   * @throws IOException if communication to the JobTracker fails
1338   */
1339  public boolean monitorAndPrintJob() 
1340      throws IOException, InterruptedException {
1341    String lastReport = null;
1342    Job.TaskStatusFilter filter;
1343    Configuration clientConf = getConfiguration();
1344    filter = Job.getTaskOutputFilter(clientConf);
1345    JobID jobId = getJobID();
1346    LOG.info("Running job: " + jobId);
1347    int eventCounter = 0;
1348    boolean profiling = getProfileEnabled();
1349    IntegerRanges mapRanges = getProfileTaskRange(true);
1350    IntegerRanges reduceRanges = getProfileTaskRange(false);
1351    int progMonitorPollIntervalMillis = 
1352      Job.getProgressPollInterval(clientConf);
1353    /* make sure to report full progress after the job is done */
1354    boolean reportedAfterCompletion = false;
1355    boolean reportedUberMode = false;
1356    while (!isComplete() || !reportedAfterCompletion) {
1357      if (isComplete()) {
1358        reportedAfterCompletion = true;
1359      } else {
1360        Thread.sleep(progMonitorPollIntervalMillis);
1361      }
1362      if (status.getState() == JobStatus.State.PREP) {
1363        continue;
1364      }      
1365      if (!reportedUberMode) {
1366        reportedUberMode = true;
1367        LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1368      }      
1369      String report = 
1370        (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1371            " reduce " + 
1372            StringUtils.formatPercent(reduceProgress(), 0));
1373      if (!report.equals(lastReport)) {
1374        LOG.info(report);
1375        lastReport = report;
1376      }
1377
1378      TaskCompletionEvent[] events = 
1379        getTaskCompletionEvents(eventCounter, 10); 
1380      eventCounter += events.length;
1381      printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1382    }
1383    boolean success = isSuccessful();
1384    if (success) {
1385      LOG.info("Job " + jobId + " completed successfully");
1386    } else {
1387      LOG.info("Job " + jobId + " failed with state " + status.getState() + 
1388          " due to: " + status.getFailureInfo());
1389    }
1390    Counters counters = getCounters();
1391    if (counters != null) {
1392      LOG.info(counters.toString());
1393    }
1394    return success;
1395  }
1396
1397  /**
1398   * @return true if the profile parameters indicate that this is using
1399   * hprof, which generates profile files in a particular location
1400   * that we can retrieve to the client.
1401   */
1402  private boolean shouldDownloadProfile() {
1403    // Check the argument string that was used to initialize profiling.
1404    // If this indicates hprof and file-based output, then we're ok to
1405    // download.
1406    String profileParams = getProfileParams();
1407
1408    if (null == profileParams) {
1409      return false;
1410    }
1411
1412    // Split this on whitespace.
1413    String [] parts = profileParams.split("[ \\t]+");
1414
1415    // If any of these indicate hprof, and the use of output files, return true.
1416    boolean hprofFound = false;
1417    boolean fileFound = false;
1418    for (String p : parts) {
1419      if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
1420        hprofFound = true;
1421
1422        // This contains a number of comma-delimited components, one of which
1423        // may specify the file to write to. Make sure this is present and
1424        // not empty.
1425        String [] subparts = p.split(",");
1426        for (String sub : subparts) {
1427          if (sub.startsWith("file=") && sub.length() != "file=".length()) {
1428            fileFound = true;
1429          }
1430        }
1431      }
1432    }
1433
1434    return hprofFound && fileFound;
1435  }
1436
1437  private void printTaskEvents(TaskCompletionEvent[] events,
1438      Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1439      IntegerRanges reduceRanges) throws IOException, InterruptedException {
1440    for (TaskCompletionEvent event : events) {
1441      switch (filter) {
1442      case NONE:
1443        break;
1444      case SUCCEEDED:
1445        if (event.getStatus() == 
1446          TaskCompletionEvent.Status.SUCCEEDED) {
1447          LOG.info(event.toString());
1448        }
1449        break; 
1450      case FAILED:
1451        if (event.getStatus() == 
1452          TaskCompletionEvent.Status.FAILED) {
1453          LOG.info(event.toString());
1454          // Displaying the task diagnostic information
1455          TaskAttemptID taskId = event.getTaskAttemptId();
1456          String[] taskDiagnostics = getTaskDiagnostics(taskId); 
1457          if (taskDiagnostics != null) {
1458            for (String diagnostics : taskDiagnostics) {
1459              System.err.println(diagnostics);
1460            }
1461          }
1462        }
1463        break; 
1464      case KILLED:
1465        if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1466          LOG.info(event.toString());
1467        }
1468        break; 
1469      case ALL:
1470        LOG.info(event.toString());
1471        break;
1472      }
1473    }
1474  }
1475
1476  /** The interval at which monitorAndPrintJob() prints status */
1477  public static int getProgressPollInterval(Configuration conf) {
1478    // Read progress monitor poll interval from config. Default is 1 second.
1479    int progMonitorPollIntervalMillis = conf.getInt(
1480      PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1481    if (progMonitorPollIntervalMillis < 1) {
1482      LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
1483        " has been set to an invalid value; "
1484        + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1485      progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1486    }
1487    return progMonitorPollIntervalMillis;
1488  }
1489
1490  /** The interval at which waitForCompletion() should check. */
1491  public static int getCompletionPollInterval(Configuration conf) {
1492    int completionPollIntervalMillis = conf.getInt(
1493      COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1494    if (completionPollIntervalMillis < 1) { 
1495      LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
1496       " has been set to an invalid value; "
1497       + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1498      completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1499    }
1500    return completionPollIntervalMillis;
1501  }
1502
1503  /**
1504   * Get the task output filter.
1505   * 
1506   * @param conf the configuration.
1507   * @return the filter level.
1508   */
1509  public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1510    return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1511  }
1512
1513  /**
1514   * Modify the Configuration to set the task output filter.
1515   * 
1516   * @param conf the Configuration to modify.
1517   * @param newValue the value to set.
1518   */
1519  public static void setTaskOutputFilter(Configuration conf, 
1520      TaskStatusFilter newValue) {
1521    conf.set(Job.OUTPUT_FILTER, newValue.toString());
1522  }
1523
1524  public boolean isUber() throws IOException, InterruptedException {
1525    ensureState(JobState.RUNNING);
1526    updateStatus();
1527    return status.isUber();
1528  }
1529
1530  /**
1531   * Get the reservation to which the job is submitted to, if any
1532   *
1533   * @return the reservationId the identifier of the job's reservation, null if
1534   *         the job does not have any reservation associated with it
1535   */
1536  public ReservationId getReservationId() {
1537    return reservationId;
1538  }
1539
1540  /**
1541   * Set the reservation to which the job is submitted to
1542   *
1543   * @param reservationId the reservationId to set
1544   */
1545  public void setReservationId(ReservationId reservationId) {
1546    this.reservationId = reservationId;
1547  }
1548  
1549}