001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce;
020    
021    import java.io.IOException;
022    import java.net.URI;
023    import java.security.PrivilegedExceptionAction;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.classification.InterfaceAudience.Private;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.conf.Configuration.IntegerRanges;
032    import org.apache.hadoop.fs.FileSystem;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.io.RawComparator;
035    import org.apache.hadoop.mapred.JobConf;
036    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037    import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038    import org.apache.hadoop.mapreduce.task.JobContextImpl;
039    import org.apache.hadoop.mapreduce.util.ConfigUtil;
040    import org.apache.hadoop.util.StringUtils;
041    import org.apache.hadoop.yarn.api.records.ReservationId;
042    
043    /**
044     * The job submitter's view of the Job.
045     * 
046     * <p>It allows the user to configure the
047     * job, submit it, control its execution, and query the state. The set methods
048     * only work until the job is submitted, afterwards they will throw an 
049     * IllegalStateException. </p>
050     * 
051     * <p>
052     * Normally the user creates the application, describes various facets of the
053     * job via {@link Job} and then submits the job and monitor its progress.</p>
054     * 
055     * <p>Here is an example on how to submit a job:</p>
056     * <p><blockquote><pre>
057     *     // Create a new Job
058     *     Job job = Job.getInstance();
059     *     job.setJarByClass(MyJob.class);
060     *     
061     *     // Specify various job-specific parameters     
062     *     job.setJobName("myjob");
063     *     
064     *     job.setInputPath(new Path("in"));
065     *     job.setOutputPath(new Path("out"));
066     *     
067     *     job.setMapperClass(MyJob.MyMapper.class);
068     *     job.setReducerClass(MyJob.MyReducer.class);
069     *
070     *     // Submit the job, then poll for progress until the job is complete
071     *     job.waitForCompletion(true);
072     * </pre></blockquote></p>
073     * 
074     * 
075     */
076    @InterfaceAudience.Public
077    @InterfaceStability.Evolving
078    public class Job extends JobContextImpl implements JobContext {  
079      private static final Log LOG = LogFactory.getLog(Job.class);
080    
081      @InterfaceStability.Evolving
082      public static enum JobState {DEFINE, RUNNING};
083      private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
084      public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
085      /** Key in mapred-*.xml that sets completionPollInvervalMillis */
086      public static final String COMPLETION_POLL_INTERVAL_KEY = 
087        "mapreduce.client.completion.pollinterval";
088      
089      /** Default completionPollIntervalMillis is 5000 ms. */
090      static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
091      /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
092      public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
093        "mapreduce.client.progressmonitor.pollinterval";
094      /** Default progMonitorPollIntervalMillis is 1000 ms. */
095      static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
096    
097      public static final String USED_GENERIC_PARSER = 
098        "mapreduce.client.genericoptionsparser.used";
099      public static final String SUBMIT_REPLICATION = 
100        "mapreduce.client.submit.file.replication";
101      private static final String TASKLOG_PULL_TIMEOUT_KEY =
102               "mapreduce.client.tasklog.timeout";
103      private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
104    
105      @InterfaceStability.Evolving
106      public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
107    
108      static {
109        ConfigUtil.loadResources();
110      }
111    
112      private JobState state = JobState.DEFINE;
113      private JobStatus status;
114      private long statustime;
115      private Cluster cluster;
116      private ReservationId reservationId;
117    
118      /**
119       * @deprecated Use {@link #getInstance()}
120       */
121      @Deprecated
122      public Job() throws IOException {
123        this(new Configuration());
124      }
125    
126      /**
127       * @deprecated Use {@link #getInstance(Configuration)}
128       */
129      @Deprecated
130      public Job(Configuration conf) throws IOException {
131        this(new JobConf(conf));
132      }
133    
134      /**
135       * @deprecated Use {@link #getInstance(Configuration, String)}
136       */
137      @Deprecated
138      public Job(Configuration conf, String jobName) throws IOException {
139        this(conf);
140        setJobName(jobName);
141      }
142    
143      Job(JobConf conf) throws IOException {
144        super(conf, null);
145        // propagate existing user credentials to job
146        this.credentials.mergeAll(this.ugi.getCredentials());
147        this.cluster = null;
148      }
149    
150      Job(JobStatus status, JobConf conf) throws IOException {
151        this(conf);
152        setJobID(status.getJobID());
153        this.status = status;
154        state = JobState.RUNNING;
155      }
156    
157          
158      /**
159       * Creates a new {@link Job} with no particular {@link Cluster} .
160       * A Cluster will be created with a generic {@link Configuration}.
161       * 
162       * @return the {@link Job} , with no connection to a cluster yet.
163       * @throws IOException
164       */
165      public static Job getInstance() throws IOException {
166        // create with a null Cluster
167        return getInstance(new Configuration());
168      }
169          
170      /**
171       * Creates a new {@link Job} with no particular {@link Cluster} and a 
172       * given {@link Configuration}.
173       * 
174       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
175       * that any necessary internal modifications do not reflect on the incoming 
176       * parameter.
177       * 
178       * A Cluster will be created from the conf parameter only when it's needed.
179       * 
180       * @param conf the configuration
181       * @return the {@link Job} , with no connection to a cluster yet.
182       * @throws IOException
183       */
184      public static Job getInstance(Configuration conf) throws IOException {
185        // create with a null Cluster
186        JobConf jobConf = new JobConf(conf);
187        return new Job(jobConf);
188      }
189    
190          
191      /**
192       * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
193       * A Cluster will be created from the conf parameter only when it's needed.
194       *
195       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
196       * that any necessary internal modifications do not reflect on the incoming 
197       * parameter.
198       * 
199       * @param conf the configuration
200       * @return the {@link Job} , with no connection to a cluster yet.
201       * @throws IOException
202       */
203      public static Job getInstance(Configuration conf, String jobName)
204               throws IOException {
205        // create with a null Cluster
206        Job result = getInstance(conf);
207        result.setJobName(jobName);
208        return result;
209      }
210      
211      /**
212       * Creates a new {@link Job} with no particular {@link Cluster} and given
213       * {@link Configuration} and {@link JobStatus}.
214       * A Cluster will be created from the conf parameter only when it's needed.
215       * 
216       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
217       * that any necessary internal modifications do not reflect on the incoming 
218       * parameter.
219       * 
220       * @param status job status
221       * @param conf job configuration
222       * @return the {@link Job} , with no connection to a cluster yet.
223       * @throws IOException
224       */
225      public static Job getInstance(JobStatus status, Configuration conf) 
226      throws IOException {
227        return new Job(status, new JobConf(conf));
228      }
229    
230      /**
231       * Creates a new {@link Job} with no particular {@link Cluster}.
232       * A Cluster will be created from the conf parameter only when it's needed.
233       *
234       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
235       * that any necessary internal modifications do not reflect on the incoming 
236       * parameter.
237       * 
238       * @param ignored
239       * @return the {@link Job} , with no connection to a cluster yet.
240       * @throws IOException
241       * @deprecated Use {@link #getInstance()}
242       */
243      @Deprecated
244      public static Job getInstance(Cluster ignored) throws IOException {
245        return getInstance();
246      }
247      
248      /**
249       * Creates a new {@link Job} with no particular {@link Cluster} and given
250       * {@link Configuration}.
251       * A Cluster will be created from the conf parameter only when it's needed.
252       * 
253       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
254       * that any necessary internal modifications do not reflect on the incoming 
255       * parameter.
256       * 
257       * @param ignored
258       * @param conf job configuration
259       * @return the {@link Job} , with no connection to a cluster yet.
260       * @throws IOException
261       * @deprecated Use {@link #getInstance(Configuration)}
262       */
263      @Deprecated
264      public static Job getInstance(Cluster ignored, Configuration conf) 
265          throws IOException {
266        return getInstance(conf);
267      }
268      
269      /**
270       * Creates a new {@link Job} with no particular {@link Cluster} and given
271       * {@link Configuration} and {@link JobStatus}.
272       * A Cluster will be created from the conf parameter only when it's needed.
273       * 
274       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
275       * that any necessary internal modifications do not reflect on the incoming 
276       * parameter.
277       * 
278       * @param cluster cluster
279       * @param status job status
280       * @param conf job configuration
281       * @return the {@link Job} , with no connection to a cluster yet.
282       * @throws IOException
283       */
284      @Private
285      public static Job getInstance(Cluster cluster, JobStatus status, 
286          Configuration conf) throws IOException {
287        Job job = getInstance(status, conf);
288        job.setCluster(cluster);
289        return job;
290      }
291    
292      private void ensureState(JobState state) throws IllegalStateException {
293        if (state != this.state) {
294          throw new IllegalStateException("Job in state "+ this.state + 
295                                          " instead of " + state);
296        }
297    
298        if (state == JobState.RUNNING && cluster == null) {
299          throw new IllegalStateException
300            ("Job in state " + this.state
301             + ", but it isn't attached to any job tracker!");
302        }
303      }
304    
305      /**
306       * Some methods rely on having a recent job status object.  Refresh
307       * it, if necessary
308       */
309      synchronized void ensureFreshStatus() 
310          throws IOException {
311        if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
312          updateStatus();
313        }
314      }
315        
316      /** Some methods need to update status immediately. So, refresh
317       * immediately
318       * @throws IOException
319       */
320      synchronized void updateStatus() throws IOException {
321        try {
322          this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
323            @Override
324            public JobStatus run() throws IOException, InterruptedException {
325              return cluster.getClient().getJobStatus(status.getJobID());
326            }
327          });
328        }
329        catch (InterruptedException ie) {
330          throw new IOException(ie);
331        }
332        if (this.status == null) {
333          throw new IOException("Job status not available ");
334        }
335        this.statustime = System.currentTimeMillis();
336      }
337      
338      public JobStatus getStatus() throws IOException, InterruptedException {
339        ensureState(JobState.RUNNING);
340        updateStatus();
341        return status;
342      }
343      
344      private void setStatus(JobStatus status) {
345        this.status = status;
346      }
347    
348      /**
349       * Returns the current state of the Job.
350       * 
351       * @return JobStatus#State
352       * @throws IOException
353       * @throws InterruptedException
354       */
355      public JobStatus.State getJobState() 
356          throws IOException, InterruptedException {
357        ensureState(JobState.RUNNING);
358        updateStatus();
359        return status.getState();
360      }
361      
362      /**
363       * Get the URL where some job progress information will be displayed.
364       * 
365       * @return the URL where some job progress information will be displayed.
366       */
367      public String getTrackingURL(){
368        ensureState(JobState.RUNNING);
369        return status.getTrackingUrl().toString();
370      }
371    
372      /**
373       * Get the path of the submitted job configuration.
374       * 
375       * @return the path of the submitted job configuration.
376       */
377      public String getJobFile() {
378        ensureState(JobState.RUNNING);
379        return status.getJobFile();
380      }
381    
382      /**
383       * Get start time of the job.
384       * 
385       * @return the start time of the job
386       */
387      public long getStartTime() {
388        ensureState(JobState.RUNNING);
389        return status.getStartTime();
390      }
391    
392      /**
393       * Get finish time of the job.
394       * 
395       * @return the finish time of the job
396       */
397      public long getFinishTime() throws IOException, InterruptedException {
398        ensureState(JobState.RUNNING);
399        updateStatus();
400        return status.getFinishTime();
401      }
402    
403      /**
404       * Get scheduling info of the job.
405       * 
406       * @return the scheduling info of the job
407       */
408      public String getSchedulingInfo() {
409        ensureState(JobState.RUNNING);
410        return status.getSchedulingInfo();
411      }
412    
413      /**
414       * Get scheduling info of the job.
415       * 
416       * @return the scheduling info of the job
417       */
418      public JobPriority getPriority() throws IOException, InterruptedException {
419        ensureState(JobState.RUNNING);
420        updateStatus();
421        return status.getPriority();
422      }
423    
424      /**
425       * The user-specified job name.
426       */
427      public String getJobName() {
428        if (state == JobState.DEFINE) {
429          return super.getJobName();
430        }
431        ensureState(JobState.RUNNING);
432        return status.getJobName();
433      }
434    
435      public String getHistoryUrl() throws IOException, InterruptedException {
436        ensureState(JobState.RUNNING);
437        updateStatus();
438        return status.getHistoryFile();
439      }
440    
441      public boolean isRetired() throws IOException, InterruptedException {
442        ensureState(JobState.RUNNING);
443        updateStatus();
444        return status.isRetired();
445      }
446      
447      @Private
448      public Cluster getCluster() {
449        return cluster;
450      }
451    
452      /** Only for mocks in unit tests. */
453      @Private
454      private void setCluster(Cluster cluster) {
455        this.cluster = cluster;
456      }
457    
458      /**
459       * Dump stats to screen.
460       */
461      @Override
462      public String toString() {
463        ensureState(JobState.RUNNING);
464        String reasonforFailure = " ";
465        int numMaps = 0;
466        int numReduces = 0;
467        try {
468          updateStatus();
469          if (status.getState().equals(JobStatus.State.FAILED))
470            reasonforFailure = getTaskFailureEventString();
471          numMaps = getTaskReports(TaskType.MAP).length;
472          numReduces = getTaskReports(TaskType.REDUCE).length;
473        } catch (IOException e) {
474        } catch (InterruptedException ie) {
475        }
476        StringBuffer sb = new StringBuffer();
477        sb.append("Job: ").append(status.getJobID()).append("\n");
478        sb.append("Job File: ").append(status.getJobFile()).append("\n");
479        sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
480        sb.append("\n");
481        sb.append("Uber job : ").append(status.isUber()).append("\n");
482        sb.append("Number of maps: ").append(numMaps).append("\n");
483        sb.append("Number of reduces: ").append(numReduces).append("\n");
484        sb.append("map() completion: ");
485        sb.append(status.getMapProgress()).append("\n");
486        sb.append("reduce() completion: ");
487        sb.append(status.getReduceProgress()).append("\n");
488        sb.append("Job state: ");
489        sb.append(status.getState()).append("\n");
490        sb.append("retired: ").append(status.isRetired()).append("\n");
491        sb.append("reason for failure: ").append(reasonforFailure);
492        return sb.toString();
493      }
494    
495      /**
496       * @return taskid which caused job failure
497       * @throws IOException
498       * @throws InterruptedException
499       */
500      String getTaskFailureEventString() throws IOException,
501          InterruptedException {
502        int failCount = 1;
503        TaskCompletionEvent lastEvent = null;
504        TaskCompletionEvent[] events = ugi.doAs(new 
505            PrivilegedExceptionAction<TaskCompletionEvent[]>() {
506              @Override
507              public TaskCompletionEvent[] run() throws IOException,
508              InterruptedException {
509                return cluster.getClient().getTaskCompletionEvents(
510                    status.getJobID(), 0, 10);
511              }
512            });
513        for (TaskCompletionEvent event : events) {
514          if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
515            failCount++;
516            lastEvent = event;
517          }
518        }
519        if (lastEvent == null) {
520          return "There are no failed tasks for the job. "
521              + "Job is failed due to some other reason and reason "
522              + "can be found in the logs.";
523        }
524        String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
525        String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
526        return (" task " + taskID + " failed " +
527          failCount + " times " + "For details check tasktracker at: " +
528          lastEvent.getTaskTrackerHttp());
529      }
530    
531      /**
532       * Get the information of the current state of the tasks of a job.
533       * 
534       * @param type Type of the task
535       * @return the list of all of the map tips.
536       * @throws IOException
537       */
538      public TaskReport[] getTaskReports(TaskType type) 
539          throws IOException, InterruptedException {
540        ensureState(JobState.RUNNING);
541        final TaskType tmpType = type;
542        return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
543          public TaskReport[] run() throws IOException, InterruptedException {
544            return cluster.getClient().getTaskReports(getJobID(), tmpType);
545          }
546        });
547      }
548    
549      /**
550       * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
551       * and 1.0.  When all map tasks have completed, the function returns 1.0.
552       * 
553       * @return the progress of the job's map-tasks.
554       * @throws IOException
555       */
556      public float mapProgress() throws IOException {
557        ensureState(JobState.RUNNING);
558        ensureFreshStatus();
559        return status.getMapProgress();
560      }
561    
562      /**
563       * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
564       * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
565       * 
566       * @return the progress of the job's reduce-tasks.
567       * @throws IOException
568       */
569      public float reduceProgress() throws IOException {
570        ensureState(JobState.RUNNING);
571        ensureFreshStatus();
572        return status.getReduceProgress();
573      }
574    
575      /**
576       * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
577       * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
578       * 
579       * @return the progress of the job's cleanup-tasks.
580       * @throws IOException
581       */
582      public float cleanupProgress() throws IOException, InterruptedException {
583        ensureState(JobState.RUNNING);
584        ensureFreshStatus();
585        return status.getCleanupProgress();
586      }
587    
588      /**
589       * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
590       * and 1.0.  When all setup tasks have completed, the function returns 1.0.
591       * 
592       * @return the progress of the job's setup-tasks.
593       * @throws IOException
594       */
595      public float setupProgress() throws IOException {
596        ensureState(JobState.RUNNING);
597        ensureFreshStatus();
598        return status.getSetupProgress();
599      }
600    
601      /**
602       * Check if the job is finished or not. 
603       * This is a non-blocking call.
604       * 
605       * @return <code>true</code> if the job is complete, else <code>false</code>.
606       * @throws IOException
607       */
608      public boolean isComplete() throws IOException {
609        ensureState(JobState.RUNNING);
610        updateStatus();
611        return status.isJobComplete();
612      }
613    
614      /**
615       * Check if the job completed successfully. 
616       * 
617       * @return <code>true</code> if the job succeeded, else <code>false</code>.
618       * @throws IOException
619       */
620      public boolean isSuccessful() throws IOException {
621        ensureState(JobState.RUNNING);
622        updateStatus();
623        return status.getState() == JobStatus.State.SUCCEEDED;
624      }
625    
626      /**
627       * Kill the running job.  Blocks until all job tasks have been
628       * killed as well.  If the job is no longer running, it simply returns.
629       * 
630       * @throws IOException
631       */
632      public void killJob() throws IOException {
633        ensureState(JobState.RUNNING);
634        try {
635          cluster.getClient().killJob(getJobID());
636        }
637        catch (InterruptedException ie) {
638          throw new IOException(ie);
639        }
640      }
641    
642      /**
643       * Set the priority of a running job.
644       * @param priority the new priority for the job.
645       * @throws IOException
646       */
647      public void setPriority(JobPriority priority) 
648          throws IOException, InterruptedException {
649        if (state == JobState.DEFINE) {
650          conf.setJobPriority(
651            org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
652        } else {
653          ensureState(JobState.RUNNING);
654          final JobPriority tmpPriority = priority;
655          ugi.doAs(new PrivilegedExceptionAction<Object>() {
656            @Override
657            public Object run() throws IOException, InterruptedException {
658              cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
659              return null;
660            }
661          });
662        }
663      }
664    
665      /**
666       * Get events indicating completion (success/failure) of component tasks.
667       *  
668       * @param startFrom index to start fetching events from
669       * @param numEvents number of events to fetch
670       * @return an array of {@link TaskCompletionEvent}s
671       * @throws IOException
672       */
673      public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
674          final int numEvents) throws IOException, InterruptedException {
675        ensureState(JobState.RUNNING);
676        return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
677          @Override
678          public TaskCompletionEvent[] run() throws IOException, InterruptedException {
679            return cluster.getClient().getTaskCompletionEvents(getJobID(),
680                startFrom, numEvents); 
681          }
682        });
683      }
684    
685      /**
686       * Get events indicating completion (success/failure) of component tasks.
687       *  
688       * @param startFrom index to start fetching events from
689       * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s
690       * @throws IOException
691       */
692      public org.apache.hadoop.mapred.TaskCompletionEvent[]
693        getTaskCompletionEvents(final int startFrom) throws IOException {
694        try {
695          TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10);
696          org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents =
697              new org.apache.hadoop.mapred.TaskCompletionEvent[events.length];
698          for (int i = 0; i < events.length; i++) {
699            retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade
700                (events[i]);
701          }
702          return retEvents;
703        } catch (InterruptedException ie) {
704          throw new IOException(ie);
705        }
706      }
707    
708      /**
709       * Kill indicated task attempt.
710       * @param taskId the id of the task to kill.
711       * @param shouldFail if <code>true</code> the task is failed and added
712       *                   to failed tasks list, otherwise it is just killed,
713       *                   w/o affecting job failure status.
714       */
715      @Private
716      public boolean killTask(final TaskAttemptID taskId,
717                              final boolean shouldFail) throws IOException {
718        ensureState(JobState.RUNNING);
719        try {
720          return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
721            public Boolean run() throws IOException, InterruptedException {
722              return cluster.getClient().killTask(taskId, shouldFail);
723            }
724          });
725        }
726        catch (InterruptedException ie) {
727          throw new IOException(ie);
728        }
729      }
730    
731      /**
732       * Kill indicated task attempt.
733       * 
734       * @param taskId the id of the task to be terminated.
735       * @throws IOException
736       */
737      public void killTask(final TaskAttemptID taskId)
738          throws IOException {
739        killTask(taskId, false);
740      }
741    
742      /**
743       * Fail indicated task attempt.
744       * 
745       * @param taskId the id of the task to be terminated.
746       * @throws IOException
747       */
748      public void failTask(final TaskAttemptID taskId)
749          throws IOException {
750        killTask(taskId, true);
751      }
752    
753      /**
754       * Gets the counters for this job. May return null if the job has been
755       * retired and the job is no longer in the completed job store.
756       * 
757       * @return the counters for this job.
758       * @throws IOException
759       */
760      public Counters getCounters() 
761          throws IOException {
762        ensureState(JobState.RUNNING);
763        try {
764          return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
765            @Override
766            public Counters run() throws IOException, InterruptedException {
767              return cluster.getClient().getJobCounters(getJobID());
768            }
769          });
770        }
771        catch (InterruptedException ie) {
772          throw new IOException(ie);
773        }
774      }
775    
776      /**
777       * Gets the diagnostic messages for a given task attempt.
778       * @param taskid
779       * @return the list of diagnostic messages for the task
780       * @throws IOException
781       */
782      public String[] getTaskDiagnostics(final TaskAttemptID taskid) 
783          throws IOException, InterruptedException {
784        ensureState(JobState.RUNNING);
785        return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
786          @Override
787          public String[] run() throws IOException, InterruptedException {
788            return cluster.getClient().getTaskDiagnostics(taskid);
789          }
790        });
791      }
792    
793      /**
794       * Set the number of reduce tasks for the job.
795       * @param tasks the number of reduce tasks
796       * @throws IllegalStateException if the job is submitted
797       */
798      public void setNumReduceTasks(int tasks) throws IllegalStateException {
799        ensureState(JobState.DEFINE);
800        conf.setNumReduceTasks(tasks);
801      }
802    
803      /**
804       * Set the current working directory for the default file system.
805       * 
806       * @param dir the new current working directory.
807       * @throws IllegalStateException if the job is submitted
808       */
809      public void setWorkingDirectory(Path dir) throws IOException {
810        ensureState(JobState.DEFINE);
811        conf.setWorkingDirectory(dir);
812      }
813    
814      /**
815       * Set the {@link InputFormat} for the job.
816       * @param cls the <code>InputFormat</code> to use
817       * @throws IllegalStateException if the job is submitted
818       */
819      public void setInputFormatClass(Class<? extends InputFormat> cls
820                                      ) throws IllegalStateException {
821        ensureState(JobState.DEFINE);
822        conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
823                      InputFormat.class);
824      }
825    
826      /**
827       * Set the {@link OutputFormat} for the job.
828       * @param cls the <code>OutputFormat</code> to use
829       * @throws IllegalStateException if the job is submitted
830       */
831      public void setOutputFormatClass(Class<? extends OutputFormat> cls
832                                       ) throws IllegalStateException {
833        ensureState(JobState.DEFINE);
834        conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
835                      OutputFormat.class);
836      }
837    
838      /**
839       * Set the {@link Mapper} for the job.
840       * @param cls the <code>Mapper</code> to use
841       * @throws IllegalStateException if the job is submitted
842       */
843      public void setMapperClass(Class<? extends Mapper> cls
844                                 ) throws IllegalStateException {
845        ensureState(JobState.DEFINE);
846        conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
847      }
848    
849      /**
850       * Set the Jar by finding where a given class came from.
851       * @param cls the example class
852       */
853      public void setJarByClass(Class<?> cls) {
854        ensureState(JobState.DEFINE);
855        conf.setJarByClass(cls);
856      }
857    
858      /**
859       * Set the job jar 
860       */
861      public void setJar(String jar) {
862        ensureState(JobState.DEFINE);
863        conf.setJar(jar);
864      }
865    
866      /**
867       * Set the reported username for this job.
868       * 
869       * @param user the username for this job.
870       */
871      public void setUser(String user) {
872        ensureState(JobState.DEFINE);
873        conf.setUser(user);
874      }
875    
876      /**
877       * Set the combiner class for the job.
878       * @param cls the combiner to use
879       * @throws IllegalStateException if the job is submitted
880       */
881      public void setCombinerClass(Class<? extends Reducer> cls
882                                   ) throws IllegalStateException {
883        ensureState(JobState.DEFINE);
884        conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
885      }
886    
887      /**
888       * Set the {@link Reducer} for the job.
889       * @param cls the <code>Reducer</code> to use
890       * @throws IllegalStateException if the job is submitted
891       */
892      public void setReducerClass(Class<? extends Reducer> cls
893                                  ) throws IllegalStateException {
894        ensureState(JobState.DEFINE);
895        conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
896      }
897    
898      /**
899       * Set the {@link Partitioner} for the job.
900       * @param cls the <code>Partitioner</code> to use
901       * @throws IllegalStateException if the job is submitted
902       */
903      public void setPartitionerClass(Class<? extends Partitioner> cls
904                                      ) throws IllegalStateException {
905        ensureState(JobState.DEFINE);
906        conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
907                      Partitioner.class);
908      }
909    
910      /**
911       * Set the key class for the map output data. This allows the user to
912       * specify the map output key class to be different than the final output
913       * value class.
914       * 
915       * @param theClass the map output key class.
916       * @throws IllegalStateException if the job is submitted
917       */
918      public void setMapOutputKeyClass(Class<?> theClass
919                                       ) throws IllegalStateException {
920        ensureState(JobState.DEFINE);
921        conf.setMapOutputKeyClass(theClass);
922      }
923    
924      /**
925       * Set the value class for the map output data. This allows the user to
926       * specify the map output value class to be different than the final output
927       * value class.
928       * 
929       * @param theClass the map output value class.
930       * @throws IllegalStateException if the job is submitted
931       */
932      public void setMapOutputValueClass(Class<?> theClass
933                                         ) throws IllegalStateException {
934        ensureState(JobState.DEFINE);
935        conf.setMapOutputValueClass(theClass);
936      }
937    
938      /**
939       * Set the key class for the job output data.
940       * 
941       * @param theClass the key class for the job output data.
942       * @throws IllegalStateException if the job is submitted
943       */
944      public void setOutputKeyClass(Class<?> theClass
945                                    ) throws IllegalStateException {
946        ensureState(JobState.DEFINE);
947        conf.setOutputKeyClass(theClass);
948      }
949    
950      /**
951       * Set the value class for job outputs.
952       * 
953       * @param theClass the value class for job outputs.
954       * @throws IllegalStateException if the job is submitted
955       */
956      public void setOutputValueClass(Class<?> theClass
957                                      ) throws IllegalStateException {
958        ensureState(JobState.DEFINE);
959        conf.setOutputValueClass(theClass);
960      }
961    
962      /**
963       * Define the comparator that controls which keys are grouped together
964       * for a single call to combiner,
965       * {@link Reducer#reduce(Object, Iterable,
966       * org.apache.hadoop.mapreduce.Reducer.Context)}
967       *
968       * @param cls the raw comparator to use
969       * @throws IllegalStateException if the job is submitted
970       */
971      public void setCombinerKeyGroupingComparatorClass(
972          Class<? extends RawComparator> cls) throws IllegalStateException {
973        ensureState(JobState.DEFINE);
974        conf.setCombinerKeyGroupingComparator(cls);
975      }
976    
977      /**
978       * Define the comparator that controls how the keys are sorted before they
979       * are passed to the {@link Reducer}.
980       * @param cls the raw comparator
981       * @throws IllegalStateException if the job is submitted
982       * @see #setCombinerKeyGroupingComparatorClass(Class)
983       */
984      public void setSortComparatorClass(Class<? extends RawComparator> cls
985                                         ) throws IllegalStateException {
986        ensureState(JobState.DEFINE);
987        conf.setOutputKeyComparatorClass(cls);
988      }
989    
990      /**
991       * Define the comparator that controls which keys are grouped together
992       * for a single call to 
993       * {@link Reducer#reduce(Object, Iterable, 
994       *                       org.apache.hadoop.mapreduce.Reducer.Context)}
995       * @param cls the raw comparator to use
996       * @throws IllegalStateException if the job is submitted
997       * @see #setCombinerKeyGroupingComparatorClass(Class)
998       */
999      public void setGroupingComparatorClass(Class<? extends RawComparator> cls
1000                                             ) throws IllegalStateException {
1001        ensureState(JobState.DEFINE);
1002        conf.setOutputValueGroupingComparator(cls);
1003      }
1004    
1005      /**
1006       * Set the user-specified job name.
1007       * 
1008       * @param name the job's new name.
1009       * @throws IllegalStateException if the job is submitted
1010       */
1011      public void setJobName(String name) throws IllegalStateException {
1012        ensureState(JobState.DEFINE);
1013        conf.setJobName(name);
1014      }
1015    
1016      /**
1017       * Turn speculative execution on or off for this job. 
1018       * 
1019       * @param speculativeExecution <code>true</code> if speculative execution 
1020       *                             should be turned on, else <code>false</code>.
1021       */
1022      public void setSpeculativeExecution(boolean speculativeExecution) {
1023        ensureState(JobState.DEFINE);
1024        conf.setSpeculativeExecution(speculativeExecution);
1025      }
1026    
1027      /**
1028       * Turn speculative execution on or off for this job for map tasks. 
1029       * 
1030       * @param speculativeExecution <code>true</code> if speculative execution 
1031       *                             should be turned on for map tasks,
1032       *                             else <code>false</code>.
1033       */
1034      public void setMapSpeculativeExecution(boolean speculativeExecution) {
1035        ensureState(JobState.DEFINE);
1036        conf.setMapSpeculativeExecution(speculativeExecution);
1037      }
1038    
1039      /**
1040       * Turn speculative execution on or off for this job for reduce tasks. 
1041       * 
1042       * @param speculativeExecution <code>true</code> if speculative execution 
1043       *                             should be turned on for reduce tasks,
1044       *                             else <code>false</code>.
1045       */
1046      public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1047        ensureState(JobState.DEFINE);
1048        conf.setReduceSpeculativeExecution(speculativeExecution);
1049      }
1050    
1051      /**
1052       * Specify whether job-setup and job-cleanup is needed for the job 
1053       * 
1054       * @param needed If <code>true</code>, job-setup and job-cleanup will be
1055       *               considered from {@link OutputCommitter} 
1056       *               else ignored.
1057       */
1058      public void setJobSetupCleanupNeeded(boolean needed) {
1059        ensureState(JobState.DEFINE);
1060        conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
1061      }
1062    
1063      /**
1064       * Set the given set of archives
1065       * @param archives The list of archives that need to be localized
1066       */
1067      public void setCacheArchives(URI[] archives) {
1068        ensureState(JobState.DEFINE);
1069        DistributedCache.setCacheArchives(archives, conf);
1070      }
1071    
1072      /**
1073       * Set the given set of files
1074       * @param files The list of files that need to be localized
1075       */
1076      public void setCacheFiles(URI[] files) {
1077        ensureState(JobState.DEFINE);
1078        DistributedCache.setCacheFiles(files, conf);
1079      }
1080    
1081      /**
1082       * Add a archives to be localized
1083       * @param uri The uri of the cache to be localized
1084       */
1085      public void addCacheArchive(URI uri) {
1086        ensureState(JobState.DEFINE);
1087        DistributedCache.addCacheArchive(uri, conf);
1088      }
1089      
1090      /**
1091       * Add a file to be localized
1092       * @param uri The uri of the cache to be localized
1093       */
1094      public void addCacheFile(URI uri) {
1095        ensureState(JobState.DEFINE);
1096        DistributedCache.addCacheFile(uri, conf);
1097      }
1098    
1099      /**
1100       * Add an file path to the current set of classpath entries It adds the file
1101       * to cache as well.
1102       * 
1103       * Files added with this method will not be unpacked while being added to the
1104       * classpath.
1105       * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1106       * method instead.
1107       *
1108       * @param file Path of the file to be added
1109       */
1110      public void addFileToClassPath(Path file)
1111        throws IOException {
1112        ensureState(JobState.DEFINE);
1113        DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1114      }
1115    
1116      /**
1117       * Add an archive path to the current set of classpath entries. It adds the
1118       * archive to cache as well.
1119       * 
1120       * Archive files will be unpacked and added to the classpath
1121       * when being distributed.
1122       *
1123       * @param archive Path of the archive to be added
1124       */
1125      public void addArchiveToClassPath(Path archive)
1126        throws IOException {
1127        ensureState(JobState.DEFINE);
1128        DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1129      }
1130    
1131      /**
1132       * Originally intended to enable symlinks, but currently symlinks cannot be
1133       * disabled.
1134       */
1135      @Deprecated
1136      public void createSymlink() {
1137        ensureState(JobState.DEFINE);
1138        DistributedCache.createSymlink(conf);
1139      }
1140      
1141      /** 
1142       * Expert: Set the number of maximum attempts that will be made to run a
1143       * map task.
1144       * 
1145       * @param n the number of attempts per map task.
1146       */
1147      public void setMaxMapAttempts(int n) {
1148        ensureState(JobState.DEFINE);
1149        conf.setMaxMapAttempts(n);
1150      }
1151    
1152      /** 
1153       * Expert: Set the number of maximum attempts that will be made to run a
1154       * reduce task.
1155       * 
1156       * @param n the number of attempts per reduce task.
1157       */
1158      public void setMaxReduceAttempts(int n) {
1159        ensureState(JobState.DEFINE);
1160        conf.setMaxReduceAttempts(n);
1161      }
1162    
1163      /**
1164       * Set whether the system should collect profiler information for some of 
1165       * the tasks in this job? The information is stored in the user log 
1166       * directory.
1167       * @param newValue true means it should be gathered
1168       */
1169      public void setProfileEnabled(boolean newValue) {
1170        ensureState(JobState.DEFINE);
1171        conf.setProfileEnabled(newValue);
1172      }
1173    
1174      /**
1175       * Set the profiler configuration arguments. If the string contains a '%s' it
1176       * will be replaced with the name of the profiling output file when the task
1177       * runs.
1178       *
1179       * This value is passed to the task child JVM on the command line.
1180       *
1181       * @param value the configuration string
1182       */
1183      public void setProfileParams(String value) {
1184        ensureState(JobState.DEFINE);
1185        conf.setProfileParams(value);
1186      }
1187    
1188      /**
1189       * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1190       * must also be called.
1191       * @param newValue a set of integer ranges of the map ids
1192       */
1193      public void setProfileTaskRange(boolean isMap, String newValue) {
1194        ensureState(JobState.DEFINE);
1195        conf.setProfileTaskRange(isMap, newValue);
1196      }
1197    
1198      private void ensureNotSet(String attr, String msg) throws IOException {
1199        if (conf.get(attr) != null) {
1200          throw new IOException(attr + " is incompatible with " + msg + " mode.");
1201        }    
1202      }
1203      
1204      /**
1205       * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1206       * tokens upon job completion. Defaults to true.
1207       */
1208      public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1209        ensureState(JobState.DEFINE);
1210        conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1211      }
1212    
1213      /**
1214       * Default to the new APIs unless they are explicitly set or the old mapper or
1215       * reduce attributes are used.
1216       * @throws IOException if the configuration is inconsistant
1217       */
1218      private void setUseNewAPI() throws IOException {
1219        int numReduces = conf.getNumReduceTasks();
1220        String oldMapperClass = "mapred.mapper.class";
1221        String oldReduceClass = "mapred.reducer.class";
1222        conf.setBooleanIfUnset("mapred.mapper.new-api",
1223                               conf.get(oldMapperClass) == null);
1224        if (conf.getUseNewMapper()) {
1225          String mode = "new map API";
1226          ensureNotSet("mapred.input.format.class", mode);
1227          ensureNotSet(oldMapperClass, mode);
1228          if (numReduces != 0) {
1229            ensureNotSet("mapred.partitioner.class", mode);
1230           } else {
1231            ensureNotSet("mapred.output.format.class", mode);
1232          }      
1233        } else {
1234          String mode = "map compatability";
1235          ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1236          ensureNotSet(MAP_CLASS_ATTR, mode);
1237          if (numReduces != 0) {
1238            ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1239           } else {
1240            ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1241          }
1242        }
1243        if (numReduces != 0) {
1244          conf.setBooleanIfUnset("mapred.reducer.new-api",
1245                                 conf.get(oldReduceClass) == null);
1246          if (conf.getUseNewReducer()) {
1247            String mode = "new reduce API";
1248            ensureNotSet("mapred.output.format.class", mode);
1249            ensureNotSet(oldReduceClass, mode);   
1250          } else {
1251            String mode = "reduce compatability";
1252            ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1253            ensureNotSet(REDUCE_CLASS_ATTR, mode);   
1254          }
1255        }   
1256      }
1257    
1258      private synchronized void connect()
1259              throws IOException, InterruptedException, ClassNotFoundException {
1260        if (cluster == null) {
1261          cluster = 
1262            ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1263                       public Cluster run()
1264                              throws IOException, InterruptedException, 
1265                                     ClassNotFoundException {
1266                         return new Cluster(getConfiguration());
1267                       }
1268                     });
1269        }
1270      }
1271    
1272      boolean isConnected() {
1273        return cluster != null;
1274      }
1275    
1276      /** Only for mocking via unit tests. */
1277      @Private
1278      public JobSubmitter getJobSubmitter(FileSystem fs, 
1279          ClientProtocol submitClient) throws IOException {
1280        return new JobSubmitter(fs, submitClient);
1281      }
1282      /**
1283       * Submit the job to the cluster and return immediately.
1284       * @throws IOException
1285       */
1286      public void submit() 
1287             throws IOException, InterruptedException, ClassNotFoundException {
1288        ensureState(JobState.DEFINE);
1289        setUseNewAPI();
1290        connect();
1291        final JobSubmitter submitter = 
1292            getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1293        status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1294          public JobStatus run() throws IOException, InterruptedException, 
1295          ClassNotFoundException {
1296            return submitter.submitJobInternal(Job.this, cluster);
1297          }
1298        });
1299        state = JobState.RUNNING;
1300        LOG.info("The url to track the job: " + getTrackingURL());
1301       }
1302      
1303      /**
1304       * Submit the job to the cluster and wait for it to finish.
1305       * @param verbose print the progress to the user
1306       * @return true if the job succeeded
1307       * @throws IOException thrown if the communication with the 
1308       *         <code>JobTracker</code> is lost
1309       */
1310      public boolean waitForCompletion(boolean verbose
1311                                       ) throws IOException, InterruptedException,
1312                                                ClassNotFoundException {
1313        if (state == JobState.DEFINE) {
1314          submit();
1315        }
1316        if (verbose) {
1317          monitorAndPrintJob();
1318        } else {
1319          // get the completion poll interval from the client.
1320          int completionPollIntervalMillis = 
1321            Job.getCompletionPollInterval(cluster.getConf());
1322          while (!isComplete()) {
1323            try {
1324              Thread.sleep(completionPollIntervalMillis);
1325            } catch (InterruptedException ie) {
1326            }
1327          }
1328        }
1329        return isSuccessful();
1330      }
1331      
1332      /**
1333       * Monitor a job and print status in real-time as progress is made and tasks 
1334       * fail.
1335       * @return true if the job succeeded
1336       * @throws IOException if communication to the JobTracker fails
1337       */
1338      public boolean monitorAndPrintJob() 
1339          throws IOException, InterruptedException {
1340        String lastReport = null;
1341        Job.TaskStatusFilter filter;
1342        Configuration clientConf = getConfiguration();
1343        filter = Job.getTaskOutputFilter(clientConf);
1344        JobID jobId = getJobID();
1345        LOG.info("Running job: " + jobId);
1346        int eventCounter = 0;
1347        boolean profiling = getProfileEnabled();
1348        IntegerRanges mapRanges = getProfileTaskRange(true);
1349        IntegerRanges reduceRanges = getProfileTaskRange(false);
1350        int progMonitorPollIntervalMillis = 
1351          Job.getProgressPollInterval(clientConf);
1352        /* make sure to report full progress after the job is done */
1353        boolean reportedAfterCompletion = false;
1354        boolean reportedUberMode = false;
1355        while (!isComplete() || !reportedAfterCompletion) {
1356          if (isComplete()) {
1357            reportedAfterCompletion = true;
1358          } else {
1359            Thread.sleep(progMonitorPollIntervalMillis);
1360          }
1361          if (status.getState() == JobStatus.State.PREP) {
1362            continue;
1363          }      
1364          if (!reportedUberMode) {
1365            reportedUberMode = true;
1366            LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1367          }      
1368          String report = 
1369            (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1370                " reduce " + 
1371                StringUtils.formatPercent(reduceProgress(), 0));
1372          if (!report.equals(lastReport)) {
1373            LOG.info(report);
1374            lastReport = report;
1375          }
1376    
1377          TaskCompletionEvent[] events = 
1378            getTaskCompletionEvents(eventCounter, 10); 
1379          eventCounter += events.length;
1380          printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1381        }
1382        boolean success = isSuccessful();
1383        if (success) {
1384          LOG.info("Job " + jobId + " completed successfully");
1385        } else {
1386          LOG.info("Job " + jobId + " failed with state " + status.getState() + 
1387              " due to: " + status.getFailureInfo());
1388        }
1389        Counters counters = getCounters();
1390        if (counters != null) {
1391          LOG.info(counters.toString());
1392        }
1393        return success;
1394      }
1395    
1396      /**
1397       * @return true if the profile parameters indicate that this is using
1398       * hprof, which generates profile files in a particular location
1399       * that we can retrieve to the client.
1400       */
1401      private boolean shouldDownloadProfile() {
1402        // Check the argument string that was used to initialize profiling.
1403        // If this indicates hprof and file-based output, then we're ok to
1404        // download.
1405        String profileParams = getProfileParams();
1406    
1407        if (null == profileParams) {
1408          return false;
1409        }
1410    
1411        // Split this on whitespace.
1412        String [] parts = profileParams.split("[ \\t]+");
1413    
1414        // If any of these indicate hprof, and the use of output files, return true.
1415        boolean hprofFound = false;
1416        boolean fileFound = false;
1417        for (String p : parts) {
1418          if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
1419            hprofFound = true;
1420    
1421            // This contains a number of comma-delimited components, one of which
1422            // may specify the file to write to. Make sure this is present and
1423            // not empty.
1424            String [] subparts = p.split(",");
1425            for (String sub : subparts) {
1426              if (sub.startsWith("file=") && sub.length() != "file=".length()) {
1427                fileFound = true;
1428              }
1429            }
1430          }
1431        }
1432    
1433        return hprofFound && fileFound;
1434      }
1435    
1436      private void printTaskEvents(TaskCompletionEvent[] events,
1437          Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1438          IntegerRanges reduceRanges) throws IOException, InterruptedException {
1439        for (TaskCompletionEvent event : events) {
1440          switch (filter) {
1441          case NONE:
1442            break;
1443          case SUCCEEDED:
1444            if (event.getStatus() == 
1445              TaskCompletionEvent.Status.SUCCEEDED) {
1446              LOG.info(event.toString());
1447            }
1448            break; 
1449          case FAILED:
1450            if (event.getStatus() == 
1451              TaskCompletionEvent.Status.FAILED) {
1452              LOG.info(event.toString());
1453              // Displaying the task diagnostic information
1454              TaskAttemptID taskId = event.getTaskAttemptId();
1455              String[] taskDiagnostics = getTaskDiagnostics(taskId); 
1456              if (taskDiagnostics != null) {
1457                for (String diagnostics : taskDiagnostics) {
1458                  System.err.println(diagnostics);
1459                }
1460              }
1461            }
1462            break; 
1463          case KILLED:
1464            if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1465              LOG.info(event.toString());
1466            }
1467            break; 
1468          case ALL:
1469            LOG.info(event.toString());
1470            break;
1471          }
1472        }
1473      }
1474    
1475      /** The interval at which monitorAndPrintJob() prints status */
1476      public static int getProgressPollInterval(Configuration conf) {
1477        // Read progress monitor poll interval from config. Default is 1 second.
1478        int progMonitorPollIntervalMillis = conf.getInt(
1479          PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1480        if (progMonitorPollIntervalMillis < 1) {
1481          LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
1482            " has been set to an invalid value; "
1483            + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1484          progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1485        }
1486        return progMonitorPollIntervalMillis;
1487      }
1488    
1489      /** The interval at which waitForCompletion() should check. */
1490      public static int getCompletionPollInterval(Configuration conf) {
1491        int completionPollIntervalMillis = conf.getInt(
1492          COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1493        if (completionPollIntervalMillis < 1) { 
1494          LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
1495           " has been set to an invalid value; "
1496           + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1497          completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1498        }
1499        return completionPollIntervalMillis;
1500      }
1501    
1502      /**
1503       * Get the task output filter.
1504       * 
1505       * @param conf the configuration.
1506       * @return the filter level.
1507       */
1508      public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1509        return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1510      }
1511    
1512      /**
1513       * Modify the Configuration to set the task output filter.
1514       * 
1515       * @param conf the Configuration to modify.
1516       * @param newValue the value to set.
1517       */
1518      public static void setTaskOutputFilter(Configuration conf, 
1519          TaskStatusFilter newValue) {
1520        conf.set(Job.OUTPUT_FILTER, newValue.toString());
1521      }
1522    
1523      public boolean isUber() throws IOException, InterruptedException {
1524        ensureState(JobState.RUNNING);
1525        updateStatus();
1526        return status.isUber();
1527      }
1528    
1529      /**
1530       * Get the reservation to which the job is submitted to, if any
1531       *
1532       * @return the reservationId the identifier of the job's reservation, null if
1533       *         the job does not have any reservation associated with it
1534       */
1535      public ReservationId getReservationId() {
1536        return reservationId;
1537      }
1538    
1539      /**
1540       * Set the reservation to which the job is submitted to
1541       *
1542       * @param reservationId the reservationId to set
1543       */
1544      public void setReservationId(ReservationId reservationId) {
1545        this.reservationId = reservationId;
1546      }
1547      
1548    }