001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    
022    import java.io.IOException;
023    import java.util.regex.Pattern;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceAudience.Private;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.fs.FileStatus;
032    import org.apache.hadoop.fs.FileSystem;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.io.LongWritable;
035    import org.apache.hadoop.io.RawComparator;
036    import org.apache.hadoop.io.Text;
037    import org.apache.hadoop.io.WritableComparable;
038    import org.apache.hadoop.io.WritableComparator;
039    import org.apache.hadoop.io.compress.CompressionCodec;
040    import org.apache.hadoop.mapred.lib.HashPartitioner;
041    import org.apache.hadoop.mapred.lib.IdentityMapper;
042    import org.apache.hadoop.mapred.lib.IdentityReducer;
043    import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
044    import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
045    import org.apache.hadoop.mapreduce.MRConfig;
046    import org.apache.hadoop.mapreduce.MRJobConfig;
047    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
048    import org.apache.hadoop.mapreduce.util.ConfigUtil;
049    import org.apache.hadoop.security.Credentials;
050    import org.apache.hadoop.util.ClassUtil;
051    import org.apache.hadoop.util.ReflectionUtils;
052    import org.apache.hadoop.util.Tool;
053    import org.apache.log4j.Level;
054    
055    /** 
056     * A map/reduce job configuration.
057     * 
058     * <p><code>JobConf</code> is the primary interface for a user to describe a 
059     * map-reduce job to the Hadoop framework for execution. The framework tries to
060     * faithfully execute the job as-is described by <code>JobConf</code>, however:
061     * <ol>
062     *   <li>
063     *   Some configuration parameters might have been marked as 
064     *   <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
065     *   final</a> by administrators and hence cannot be altered.
066     *   </li>
067     *   <li>
068     *   While some job parameters are straight-forward to set 
069     *   (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 
070     *   rest of the framework and/or job-configuration and is relatively more 
071     *   complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}).
072     *   </li>
073     * </ol></p>
074     * 
075     * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 
076     * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 
077     * {@link OutputFormat} implementations to be used etc.
078     *
079     * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 
080     * of the job such as <code>Comparator</code>s to be used, files to be put in  
081     * the {@link DistributedCache}, whether or not intermediate and/or job outputs 
082     * are to be compressed (and how), debugability via user-provided scripts 
083     * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
084     * for doing post-processing on task logs, task's stdout, stderr, syslog. 
085     * and etc.</p>
086     * 
087     * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
088     * <p><blockquote><pre>
089     *     // Create a new JobConf
090     *     JobConf job = new JobConf(new Configuration(), MyJob.class);
091     *     
092     *     // Specify various job-specific parameters     
093     *     job.setJobName("myjob");
094     *     
095     *     FileInputFormat.setInputPaths(job, new Path("in"));
096     *     FileOutputFormat.setOutputPath(job, new Path("out"));
097     *     
098     *     job.setMapperClass(MyJob.MyMapper.class);
099     *     job.setCombinerClass(MyJob.MyReducer.class);
100     *     job.setReducerClass(MyJob.MyReducer.class);
101     *     
102     *     job.setInputFormat(SequenceFileInputFormat.class);
103     *     job.setOutputFormat(SequenceFileOutputFormat.class);
104     * </pre></blockquote></p>
105     * 
106     * @see JobClient
107     * @see ClusterStatus
108     * @see Tool
109     * @see DistributedCache
110     */
111    @InterfaceAudience.Public
112    @InterfaceStability.Stable
113    public class JobConf extends Configuration {
114      
115      private static final Log LOG = LogFactory.getLog(JobConf.class);
116    
117      static{
118        ConfigUtil.loadResources();
119      }
120    
121      /**
122       * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and
123       * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
124       */
125      @Deprecated
126      public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
127        "mapred.task.maxvmem";
128    
129      /**
130       * @deprecated 
131       */
132      @Deprecated
133      public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
134        "mapred.task.limit.maxvmem";
135    
136      /**
137       * @deprecated
138       */
139      @Deprecated
140      public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
141        "mapred.task.default.maxvmem";
142    
143      /**
144       * @deprecated
145       */
146      @Deprecated
147      public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
148        "mapred.task.maxpmem";
149    
150      /**
151       * A value which if set for memory related configuration options,
152       * indicates that the options are turned off.
153       */
154      public static final long DISABLED_MEMORY_LIMIT = -1L;
155    
156      /**
157       * Property name for the configuration property mapreduce.cluster.local.dir
158       */
159      public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
160    
161      /**
162       * Name of the queue to which jobs will be submitted, if no queue
163       * name is mentioned.
164       */
165      public static final String DEFAULT_QUEUE_NAME = "default";
166    
167      static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY =
168          JobContext.MAP_MEMORY_MB;
169    
170      static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY =
171        JobContext.REDUCE_MEMORY_MB;
172    
173      /**
174       * The variable is kept for M/R 1.x applications, while M/R 2.x applications
175       * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY}
176       */
177      @Deprecated
178      public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
179          "mapred.job.map.memory.mb";
180    
181      /**
182       * The variable is kept for M/R 1.x applications, while M/R 2.x applications
183       * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
184       */
185      @Deprecated
186      public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
187          "mapred.job.reduce.memory.mb";
188    
189      /** Pattern for the default unpacking behavior for job jars */
190      public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
191        Pattern.compile("(?:classes/|lib/).*");
192    
193      /**
194       * Configuration key to set the java command line options for the child
195       * map and reduce tasks.
196       * 
197       * Java opts for the task tracker child processes.
198       * The following symbol, if present, will be interpolated: @taskid@. 
199       * It is replaced by current TaskID. Any other occurrences of '@' will go 
200       * unchanged.
201       * For example, to enable verbose gc logging to a file named for the taskid in
202       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
203       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
204       * 
205       * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 
206       * other environment variables to the child processes.
207       * 
208       * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 
209       *                 {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
210       */
211      @Deprecated
212      public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
213      
214      /**
215       * Configuration key to set the java command line options for the map tasks.
216       * 
217       * Java opts for the task tracker child map processes.
218       * The following symbol, if present, will be interpolated: @taskid@. 
219       * It is replaced by current TaskID. Any other occurrences of '@' will go 
220       * unchanged.
221       * For example, to enable verbose gc logging to a file named for the taskid in
222       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
223       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
224       * 
225       * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 
226       * other environment variables to the map processes.
227       */
228      public static final String MAPRED_MAP_TASK_JAVA_OPTS = 
229        JobContext.MAP_JAVA_OPTS;
230      
231      /**
232       * Configuration key to set the java command line options for the reduce tasks.
233       * 
234       * Java opts for the task tracker child reduce processes.
235       * The following symbol, if present, will be interpolated: @taskid@. 
236       * It is replaced by current TaskID. Any other occurrences of '@' will go 
237       * unchanged.
238       * For example, to enable verbose gc logging to a file named for the taskid in
239       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
240       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
241       * 
242       * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 
243       * pass process environment variables to the reduce processes.
244       */
245      public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
246        JobContext.REDUCE_JAVA_OPTS;
247      
248      public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
249      
250      /**
251       * @deprecated
252       * Configuration key to set the maximum virtual memory available to the child
253       * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
254       * longer have any effect.
255       */
256      @Deprecated
257      public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
258    
259      /**
260       * @deprecated
261       * Configuration key to set the maximum virtual memory available to the
262       * map tasks (in kilo-bytes). This has been deprecated and will no
263       * longer have any effect.
264       */
265      @Deprecated
266      public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
267      
268      /**
269       * @deprecated
270       * Configuration key to set the maximum virtual memory available to the
271       * reduce tasks (in kilo-bytes). This has been deprecated and will no
272       * longer have any effect.
273       */
274      @Deprecated
275      public static final String MAPRED_REDUCE_TASK_ULIMIT =
276        "mapreduce.reduce.ulimit";
277    
278    
279      /**
280       * Configuration key to set the environment of the child map/reduce tasks.
281       * 
282       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
283       * reference existing environment variables via <code>$key</code> on
284       * Linux or <code>%key%</code> on Windows.
285       * 
286       * Example:
287       * <ul>
288       *   <li> A=foo - This will set the env variable A to foo. </li>
289       *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
290       *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
291       * </ul>
292       * 
293       * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 
294       *                 {@link #MAPRED_REDUCE_TASK_ENV}
295       */
296      @Deprecated
297      public static final String MAPRED_TASK_ENV = "mapred.child.env";
298    
299      /**
300       * Configuration key to set the environment of the child map tasks.
301       * 
302       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
303       * reference existing environment variables via <code>$key</code> on
304       * Linux or <code>%key%</code> on Windows.
305       * 
306       * Example:
307       * <ul>
308       *   <li> A=foo - This will set the env variable A to foo. </li>
309       *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
310       *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
311       * </ul>
312       */
313      public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
314      
315      /**
316       * Configuration key to set the environment of the child reduce tasks.
317       * 
318       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
319       * reference existing environment variables via <code>$key</code> on
320       * Linux or <code>%key%</code> on Windows.
321       * 
322       * Example:
323       * <ul>
324       *   <li> A=foo - This will set the env variable A to foo. </li>
325       *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
326       *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
327       * </ul>
328       */
329      public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
330    
331      private Credentials credentials = new Credentials();
332      
333      /**
334       * Configuration key to set the logging {@link Level} for the map task.
335       *
336       * The allowed logging levels are:
337       * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
338       */
339      public static final String MAPRED_MAP_TASK_LOG_LEVEL = 
340        JobContext.MAP_LOG_LEVEL;
341      
342      /**
343       * Configuration key to set the logging {@link Level} for the reduce task.
344       *
345       * The allowed logging levels are:
346       * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
347       */
348      public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 
349        JobContext.REDUCE_LOG_LEVEL;
350      
351      /**
352       * Default logging level for map/reduce tasks.
353       */
354      public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
355    
356      /**
357       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
358       * use {@link MRJobConfig#WORKFLOW_ID} instead
359       */
360      @Deprecated
361      public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID;
362    
363      /**
364       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
365       * use {@link MRJobConfig#WORKFLOW_NAME} instead
366       */
367      @Deprecated
368      public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME;
369    
370      /**
371       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
372       * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead
373       */
374      @Deprecated
375      public static final String WORKFLOW_NODE_NAME =
376          MRJobConfig.WORKFLOW_NODE_NAME;
377    
378      /**
379       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
380       * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead
381       */
382      @Deprecated
383      public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
384          MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING;
385    
386      /**
387       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
388       * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead
389       */
390      @Deprecated
391      public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
392          MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN;
393    
394      /**
395       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
396       * use {@link MRJobConfig#WORKFLOW_TAGS} instead
397       */
398      @Deprecated
399      public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS;
400    
401      /**
402       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
403       * not use it
404       */
405      @Deprecated
406      public static final String MAPREDUCE_RECOVER_JOB =
407          "mapreduce.job.restart.recover";
408    
409      /**
410       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
411       * not use it
412       */
413      @Deprecated
414      public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true;
415    
416      /**
417       * Construct a map/reduce job configuration.
418       */
419      public JobConf() {
420        checkAndWarnDeprecation();
421      }
422    
423      /** 
424       * Construct a map/reduce job configuration.
425       * 
426       * @param exampleClass a class whose containing jar is used as the job's jar.
427       */
428      public JobConf(Class exampleClass) {
429        setJarByClass(exampleClass);
430        checkAndWarnDeprecation();
431      }
432      
433      /**
434       * Construct a map/reduce job configuration.
435       * 
436       * @param conf a Configuration whose settings will be inherited.
437       */
438      public JobConf(Configuration conf) {
439        super(conf);
440        
441        if (conf instanceof JobConf) {
442          JobConf that = (JobConf)conf;
443          credentials = that.credentials;
444        }
445        
446        checkAndWarnDeprecation();
447      }
448    
449    
450      /** Construct a map/reduce job configuration.
451       * 
452       * @param conf a Configuration whose settings will be inherited.
453       * @param exampleClass a class whose containing jar is used as the job's jar.
454       */
455      public JobConf(Configuration conf, Class exampleClass) {
456        this(conf);
457        setJarByClass(exampleClass);
458      }
459    
460    
461      /** Construct a map/reduce configuration.
462       *
463       * @param config a Configuration-format XML job description file.
464       */
465      public JobConf(String config) {
466        this(new Path(config));
467      }
468    
469      /** Construct a map/reduce configuration.
470       *
471       * @param config a Configuration-format XML job description file.
472       */
473      public JobConf(Path config) {
474        super();
475        addResource(config);
476        checkAndWarnDeprecation();
477      }
478    
479      /** A new map/reduce configuration where the behavior of reading from the
480       * default resources can be turned off.
481       * <p/>
482       * If the parameter {@code loadDefaults} is false, the new instance
483       * will not load resources from the default files.
484       *
485       * @param loadDefaults specifies whether to load from the default files
486       */
487      public JobConf(boolean loadDefaults) {
488        super(loadDefaults);
489        checkAndWarnDeprecation();
490      }
491    
492      /**
493       * Get credentials for the job.
494       * @return credentials for the job
495       */
496      public Credentials getCredentials() {
497        return credentials;
498      }
499      
500      @Private
501      public void setCredentials(Credentials credentials) {
502        this.credentials = credentials;
503      }
504      
505      /**
506       * Get the user jar for the map-reduce job.
507       * 
508       * @return the user jar for the map-reduce job.
509       */
510      public String getJar() { return get(JobContext.JAR); }
511      
512      /**
513       * Set the user jar for the map-reduce job.
514       * 
515       * @param jar the user jar for the map-reduce job.
516       */
517      public void setJar(String jar) { set(JobContext.JAR, jar); }
518    
519      /**
520       * Get the pattern for jar contents to unpack on the tasktracker
521       */
522      public Pattern getJarUnpackPattern() {
523        return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
524      }
525    
526      
527      /**
528       * Set the job's jar file by finding an example class location.
529       * 
530       * @param cls the example class.
531       */
532      public void setJarByClass(Class cls) {
533        String jar = ClassUtil.findContainingJar(cls);
534        if (jar != null) {
535          setJar(jar);
536        }   
537      }
538    
539      public String[] getLocalDirs() throws IOException {
540        return getTrimmedStrings(MRConfig.LOCAL_DIR);
541      }
542    
543      /**
544       * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
545       */
546      @Deprecated
547      public void deleteLocalFiles() throws IOException {
548        String[] localDirs = getLocalDirs();
549        for (int i = 0; i < localDirs.length; i++) {
550          FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
551        }
552      }
553    
554      public void deleteLocalFiles(String subdir) throws IOException {
555        String[] localDirs = getLocalDirs();
556        for (int i = 0; i < localDirs.length; i++) {
557          FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
558        }
559      }
560    
561      /** 
562       * Constructs a local file name. Files are distributed among configured
563       * local directories.
564       */
565      public Path getLocalPath(String pathString) throws IOException {
566        return getLocalPath(MRConfig.LOCAL_DIR, pathString);
567      }
568    
569      /**
570       * Get the reported username for this job.
571       * 
572       * @return the username
573       */
574      public String getUser() {
575        return get(JobContext.USER_NAME);
576      }
577      
578      /**
579       * Set the reported username for this job.
580       * 
581       * @param user the username for this job.
582       */
583      public void setUser(String user) {
584        set(JobContext.USER_NAME, user);
585      }
586    
587    
588      
589      /**
590       * Set whether the framework should keep the intermediate files for 
591       * failed tasks.
592       * 
593       * @param keep <code>true</code> if framework should keep the intermediate files 
594       *             for failed tasks, <code>false</code> otherwise.
595       * 
596       */
597      public void setKeepFailedTaskFiles(boolean keep) {
598        setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
599      }
600      
601      /**
602       * Should the temporary files for failed tasks be kept?
603       * 
604       * @return should the files be kept?
605       */
606      public boolean getKeepFailedTaskFiles() {
607        return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
608      }
609      
610      /**
611       * Set a regular expression for task names that should be kept. 
612       * The regular expression ".*_m_000123_0" would keep the files
613       * for the first instance of map 123 that ran.
614       * 
615       * @param pattern the java.util.regex.Pattern to match against the 
616       *        task names.
617       */
618      public void setKeepTaskFilesPattern(String pattern) {
619        set(JobContext.PRESERVE_FILES_PATTERN, pattern);
620      }
621      
622      /**
623       * Get the regular expression that is matched against the task names
624       * to see if we need to keep the files.
625       * 
626       * @return the pattern as a string, if it was set, othewise null.
627       */
628      public String getKeepTaskFilesPattern() {
629        return get(JobContext.PRESERVE_FILES_PATTERN);
630      }
631      
632      /**
633       * Set the current working directory for the default file system.
634       * 
635       * @param dir the new current working directory.
636       */
637      public void setWorkingDirectory(Path dir) {
638        dir = new Path(getWorkingDirectory(), dir);
639        set(JobContext.WORKING_DIR, dir.toString());
640      }
641      
642      /**
643       * Get the current working directory for the default file system.
644       * 
645       * @return the directory name.
646       */
647      public Path getWorkingDirectory() {
648        String name = get(JobContext.WORKING_DIR);
649        if (name != null) {
650          return new Path(name);
651        } else {
652          try {
653            Path dir = FileSystem.get(this).getWorkingDirectory();
654            set(JobContext.WORKING_DIR, dir.toString());
655            return dir;
656          } catch (IOException e) {
657            throw new RuntimeException(e);
658          }
659        }
660      }
661      
662      /**
663       * Sets the number of tasks that a spawned task JVM should run
664       * before it exits
665       * @param numTasks the number of tasks to execute; defaults to 1;
666       * -1 signifies no limit
667       */
668      public void setNumTasksToExecutePerJvm(int numTasks) {
669        setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
670      }
671      
672      /**
673       * Get the number of tasks that a spawned JVM should execute
674       */
675      public int getNumTasksToExecutePerJvm() {
676        return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
677      }
678      
679      /**
680       * Get the {@link InputFormat} implementation for the map-reduce job,
681       * defaults to {@link TextInputFormat} if not specified explicity.
682       * 
683       * @return the {@link InputFormat} implementation for the map-reduce job.
684       */
685      public InputFormat getInputFormat() {
686        return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
687                                                                 TextInputFormat.class,
688                                                                 InputFormat.class),
689                                                        this);
690      }
691      
692      /**
693       * Set the {@link InputFormat} implementation for the map-reduce job.
694       * 
695       * @param theClass the {@link InputFormat} implementation for the map-reduce 
696       *                 job.
697       */
698      public void setInputFormat(Class<? extends InputFormat> theClass) {
699        setClass("mapred.input.format.class", theClass, InputFormat.class);
700      }
701      
702      /**
703       * Get the {@link OutputFormat} implementation for the map-reduce job,
704       * defaults to {@link TextOutputFormat} if not specified explicity.
705       * 
706       * @return the {@link OutputFormat} implementation for the map-reduce job.
707       */
708      public OutputFormat getOutputFormat() {
709        return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
710                                                                  TextOutputFormat.class,
711                                                                  OutputFormat.class),
712                                                         this);
713      }
714    
715      /**
716       * Get the {@link OutputCommitter} implementation for the map-reduce job,
717       * defaults to {@link FileOutputCommitter} if not specified explicitly.
718       * 
719       * @return the {@link OutputCommitter} implementation for the map-reduce job.
720       */
721      public OutputCommitter getOutputCommitter() {
722        return (OutputCommitter)ReflectionUtils.newInstance(
723          getClass("mapred.output.committer.class", FileOutputCommitter.class,
724                   OutputCommitter.class), this);
725      }
726    
727      /**
728       * Set the {@link OutputCommitter} implementation for the map-reduce job.
729       * 
730       * @param theClass the {@link OutputCommitter} implementation for the map-reduce 
731       *                 job.
732       */
733      public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
734        setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
735      }
736      
737      /**
738       * Set the {@link OutputFormat} implementation for the map-reduce job.
739       * 
740       * @param theClass the {@link OutputFormat} implementation for the map-reduce 
741       *                 job.
742       */
743      public void setOutputFormat(Class<? extends OutputFormat> theClass) {
744        setClass("mapred.output.format.class", theClass, OutputFormat.class);
745      }
746    
747      /**
748       * Should the map outputs be compressed before transfer?
749       * Uses the SequenceFile compression.
750       * 
751       * @param compress should the map outputs be compressed?
752       */
753      public void setCompressMapOutput(boolean compress) {
754        setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
755      }
756      
757      /**
758       * Are the outputs of the maps be compressed?
759       * 
760       * @return <code>true</code> if the outputs of the maps are to be compressed,
761       *         <code>false</code> otherwise.
762       */
763      public boolean getCompressMapOutput() {
764        return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
765      }
766    
767      /**
768       * Set the given class as the  {@link CompressionCodec} for the map outputs.
769       * 
770       * @param codecClass the {@link CompressionCodec} class that will compress  
771       *                   the map outputs.
772       */
773      public void 
774      setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
775        setCompressMapOutput(true);
776        setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 
777                 CompressionCodec.class);
778      }
779      
780      /**
781       * Get the {@link CompressionCodec} for compressing the map outputs.
782       * 
783       * @param defaultValue the {@link CompressionCodec} to return if not set
784       * @return the {@link CompressionCodec} class that should be used to compress the 
785       *         map outputs.
786       * @throws IllegalArgumentException if the class was specified, but not found
787       */
788      public Class<? extends CompressionCodec> 
789      getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
790        Class<? extends CompressionCodec> codecClass = defaultValue;
791        String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
792        if (name != null) {
793          try {
794            codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
795          } catch (ClassNotFoundException e) {
796            throw new IllegalArgumentException("Compression codec " + name + 
797                                               " was not found.", e);
798          }
799        }
800        return codecClass;
801      }
802      
803      /**
804       * Get the key class for the map output data. If it is not set, use the
805       * (final) output key class. This allows the map output key class to be
806       * different than the final output key class.
807       *  
808       * @return the map output key class.
809       */
810      public Class<?> getMapOutputKeyClass() {
811        Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
812        if (retv == null) {
813          retv = getOutputKeyClass();
814        }
815        return retv;
816      }
817      
818      /**
819       * Set the key class for the map output data. This allows the user to
820       * specify the map output key class to be different than the final output
821       * value class.
822       * 
823       * @param theClass the map output key class.
824       */
825      public void setMapOutputKeyClass(Class<?> theClass) {
826        setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
827      }
828      
829      /**
830       * Get the value class for the map output data. If it is not set, use the
831       * (final) output value class This allows the map output value class to be
832       * different than the final output value class.
833       *  
834       * @return the map output value class.
835       */
836      public Class<?> getMapOutputValueClass() {
837        Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
838            Object.class);
839        if (retv == null) {
840          retv = getOutputValueClass();
841        }
842        return retv;
843      }
844      
845      /**
846       * Set the value class for the map output data. This allows the user to
847       * specify the map output value class to be different than the final output
848       * value class.
849       * 
850       * @param theClass the map output value class.
851       */
852      public void setMapOutputValueClass(Class<?> theClass) {
853        setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
854      }
855      
856      /**
857       * Get the key class for the job output data.
858       * 
859       * @return the key class for the job output data.
860       */
861      public Class<?> getOutputKeyClass() {
862        return getClass(JobContext.OUTPUT_KEY_CLASS,
863                        LongWritable.class, Object.class);
864      }
865      
866      /**
867       * Set the key class for the job output data.
868       * 
869       * @param theClass the key class for the job output data.
870       */
871      public void setOutputKeyClass(Class<?> theClass) {
872        setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
873      }
874    
875      /**
876       * Get the {@link RawComparator} comparator used to compare keys.
877       * 
878       * @return the {@link RawComparator} comparator used to compare keys.
879       */
880      public RawComparator getOutputKeyComparator() {
881        Class<? extends RawComparator> theClass = getClass(
882          JobContext.KEY_COMPARATOR, null, RawComparator.class);
883        if (theClass != null)
884          return ReflectionUtils.newInstance(theClass, this);
885        return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
886      }
887    
888      /**
889       * Set the {@link RawComparator} comparator used to compare keys.
890       * 
891       * @param theClass the {@link RawComparator} comparator used to 
892       *                 compare keys.
893       * @see #setOutputValueGroupingComparator(Class)                 
894       */
895      public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
896        setClass(JobContext.KEY_COMPARATOR,
897                 theClass, RawComparator.class);
898      }
899    
900      /**
901       * Set the {@link KeyFieldBasedComparator} options used to compare keys.
902       * 
903       * @param keySpec the key specification of the form -k pos1[,pos2], where,
904       *  pos is of the form f[.c][opts], where f is the number
905       *  of the key field to use, and c is the number of the first character from
906       *  the beginning of the field. Fields and character posns are numbered 
907       *  starting with 1; a character position of zero in pos2 indicates the
908       *  field's last character. If '.c' is omitted from pos1, it defaults to 1
909       *  (the beginning of the field); if omitted from pos2, it defaults to 0 
910       *  (the end of the field). opts are ordering options. The supported options
911       *  are:
912       *    -n, (Sort numerically)
913       *    -r, (Reverse the result of comparison)                 
914       */
915      public void setKeyFieldComparatorOptions(String keySpec) {
916        setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
917        set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
918      }
919      
920      /**
921       * Get the {@link KeyFieldBasedComparator} options
922       */
923      public String getKeyFieldComparatorOption() {
924        return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
925      }
926    
927      /**
928       * Set the {@link KeyFieldBasedPartitioner} options used for 
929       * {@link Partitioner}
930       * 
931       * @param keySpec the key specification of the form -k pos1[,pos2], where,
932       *  pos is of the form f[.c][opts], where f is the number
933       *  of the key field to use, and c is the number of the first character from
934       *  the beginning of the field. Fields and character posns are numbered 
935       *  starting with 1; a character position of zero in pos2 indicates the
936       *  field's last character. If '.c' is omitted from pos1, it defaults to 1
937       *  (the beginning of the field); if omitted from pos2, it defaults to 0 
938       *  (the end of the field).
939       */
940      public void setKeyFieldPartitionerOptions(String keySpec) {
941        setPartitionerClass(KeyFieldBasedPartitioner.class);
942        set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
943      }
944      
945      /**
946       * Get the {@link KeyFieldBasedPartitioner} options
947       */
948      public String getKeyFieldPartitionerOption() {
949        return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
950      }
951    
952      /** 
953       * Get the user defined {@link WritableComparable} comparator for 
954       * grouping keys of inputs to the reduce.
955       * 
956       * @return comparator set by the user for grouping values.
957       * @see #setOutputValueGroupingComparator(Class) for details.  
958       */
959      public RawComparator getOutputValueGroupingComparator() {
960        Class<? extends RawComparator> theClass = getClass(
961          JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
962        if (theClass == null) {
963          return getOutputKeyComparator();
964        }
965        
966        return ReflectionUtils.newInstance(theClass, this);
967      }
968    
969      /** 
970       * Set the user defined {@link RawComparator} comparator for 
971       * grouping keys in the input to the reduce.
972       * 
973       * <p>This comparator should be provided if the equivalence rules for keys
974       * for sorting the intermediates are different from those for grouping keys
975       * before each call to 
976       * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
977       *  
978       * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
979       * in a single call to the reduce function if K1 and K2 compare as equal.</p>
980       * 
981       * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 
982       * how keys are sorted, this can be used in conjunction to simulate 
983       * <i>secondary sort on values</i>.</p>
984       *  
985       * <p><i>Note</i>: This is not a guarantee of the reduce sort being 
986       * <i>stable</i> in any sense. (In any case, with the order of available 
987       * map-outputs to the reduce being non-deterministic, it wouldn't make 
988       * that much sense.)</p>
989       * 
990       * @param theClass the comparator class to be used for grouping keys. 
991       *                 It should implement <code>RawComparator</code>.
992       * @see #setOutputKeyComparatorClass(Class)                 
993       */
994      public void setOutputValueGroupingComparator(
995          Class<? extends RawComparator> theClass) {
996        setClass(JobContext.GROUP_COMPARATOR_CLASS,
997                 theClass, RawComparator.class);
998      }
999    
1000      /**
1001       * Should the framework use the new context-object code for running
1002       * the mapper?
1003       * @return true, if the new api should be used
1004       */
1005      public boolean getUseNewMapper() {
1006        return getBoolean("mapred.mapper.new-api", false);
1007      }
1008      /**
1009       * Set whether the framework should use the new api for the mapper.
1010       * This is the default for jobs submitted with the new Job api.
1011       * @param flag true, if the new api should be used
1012       */
1013      public void setUseNewMapper(boolean flag) {
1014        setBoolean("mapred.mapper.new-api", flag);
1015      }
1016    
1017      /**
1018       * Should the framework use the new context-object code for running
1019       * the reducer?
1020       * @return true, if the new api should be used
1021       */
1022      public boolean getUseNewReducer() {
1023        return getBoolean("mapred.reducer.new-api", false);
1024      }
1025      /**
1026       * Set whether the framework should use the new api for the reducer. 
1027       * This is the default for jobs submitted with the new Job api.
1028       * @param flag true, if the new api should be used
1029       */
1030      public void setUseNewReducer(boolean flag) {
1031        setBoolean("mapred.reducer.new-api", flag);
1032      }
1033    
1034      /**
1035       * Get the value class for job outputs.
1036       * 
1037       * @return the value class for job outputs.
1038       */
1039      public Class<?> getOutputValueClass() {
1040        return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
1041      }
1042      
1043      /**
1044       * Set the value class for job outputs.
1045       * 
1046       * @param theClass the value class for job outputs.
1047       */
1048      public void setOutputValueClass(Class<?> theClass) {
1049        setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
1050      }
1051    
1052      /**
1053       * Get the {@link Mapper} class for the job.
1054       * 
1055       * @return the {@link Mapper} class for the job.
1056       */
1057      public Class<? extends Mapper> getMapperClass() {
1058        return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
1059      }
1060      
1061      /**
1062       * Set the {@link Mapper} class for the job.
1063       * 
1064       * @param theClass the {@link Mapper} class for the job.
1065       */
1066      public void setMapperClass(Class<? extends Mapper> theClass) {
1067        setClass("mapred.mapper.class", theClass, Mapper.class);
1068      }
1069    
1070      /**
1071       * Get the {@link MapRunnable} class for the job.
1072       * 
1073       * @return the {@link MapRunnable} class for the job.
1074       */
1075      public Class<? extends MapRunnable> getMapRunnerClass() {
1076        return getClass("mapred.map.runner.class",
1077                        MapRunner.class, MapRunnable.class);
1078      }
1079      
1080      /**
1081       * Expert: Set the {@link MapRunnable} class for the job.
1082       * 
1083       * Typically used to exert greater control on {@link Mapper}s.
1084       * 
1085       * @param theClass the {@link MapRunnable} class for the job.
1086       */
1087      public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1088        setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1089      }
1090    
1091      /**
1092       * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 
1093       * to be sent to the {@link Reducer}s.
1094       * 
1095       * @return the {@link Partitioner} used to partition map-outputs.
1096       */
1097      public Class<? extends Partitioner> getPartitionerClass() {
1098        return getClass("mapred.partitioner.class",
1099                        HashPartitioner.class, Partitioner.class);
1100      }
1101      
1102      /**
1103       * Set the {@link Partitioner} class used to partition 
1104       * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1105       * 
1106       * @param theClass the {@link Partitioner} used to partition map-outputs.
1107       */
1108      public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1109        setClass("mapred.partitioner.class", theClass, Partitioner.class);
1110      }
1111    
1112      /**
1113       * Get the {@link Reducer} class for the job.
1114       * 
1115       * @return the {@link Reducer} class for the job.
1116       */
1117      public Class<? extends Reducer> getReducerClass() {
1118        return getClass("mapred.reducer.class",
1119                        IdentityReducer.class, Reducer.class);
1120      }
1121      
1122      /**
1123       * Set the {@link Reducer} class for the job.
1124       * 
1125       * @param theClass the {@link Reducer} class for the job.
1126       */
1127      public void setReducerClass(Class<? extends Reducer> theClass) {
1128        setClass("mapred.reducer.class", theClass, Reducer.class);
1129      }
1130    
1131      /**
1132       * Get the user-defined <i>combiner</i> class used to combine map-outputs 
1133       * before being sent to the reducers. Typically the combiner is same as the
1134       * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1135       * 
1136       * @return the user-defined combiner class used to combine map-outputs.
1137       */
1138      public Class<? extends Reducer> getCombinerClass() {
1139        return getClass("mapred.combiner.class", null, Reducer.class);
1140      }
1141    
1142      /**
1143       * Set the user-defined <i>combiner</i> class used to combine map-outputs 
1144       * before being sent to the reducers. 
1145       * 
1146       * <p>The combiner is an application-specified aggregation operation, which
1147       * can help cut down the amount of data transferred between the 
1148       * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1149       * 
1150       * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1151       * the mapper and reducer tasks. In general, the combiner is called as the
1152       * sort/merge result is written to disk. The combiner must:
1153       * <ul>
1154       *   <li> be side-effect free</li>
1155       *   <li> have the same input and output key types and the same input and 
1156       *        output value types</li>
1157       * </ul></p>
1158       * 
1159       * <p>Typically the combiner is same as the <code>Reducer</code> for the  
1160       * job i.e. {@link #setReducerClass(Class)}.</p>
1161       * 
1162       * @param theClass the user-defined combiner class used to combine 
1163       *                 map-outputs.
1164       */
1165      public void setCombinerClass(Class<? extends Reducer> theClass) {
1166        setClass("mapred.combiner.class", theClass, Reducer.class);
1167      }
1168      
1169      /**
1170       * Should speculative execution be used for this job? 
1171       * Defaults to <code>true</code>.
1172       * 
1173       * @return <code>true</code> if speculative execution be used for this job,
1174       *         <code>false</code> otherwise.
1175       */
1176      public boolean getSpeculativeExecution() { 
1177        return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1178      }
1179      
1180      /**
1181       * Turn speculative execution on or off for this job. 
1182       * 
1183       * @param speculativeExecution <code>true</code> if speculative execution 
1184       *                             should be turned on, else <code>false</code>.
1185       */
1186      public void setSpeculativeExecution(boolean speculativeExecution) {
1187        setMapSpeculativeExecution(speculativeExecution);
1188        setReduceSpeculativeExecution(speculativeExecution);
1189      }
1190    
1191      /**
1192       * Should speculative execution be used for this job for map tasks? 
1193       * Defaults to <code>true</code>.
1194       * 
1195       * @return <code>true</code> if speculative execution be 
1196       *                           used for this job for map tasks,
1197       *         <code>false</code> otherwise.
1198       */
1199      public boolean getMapSpeculativeExecution() { 
1200        return getBoolean(JobContext.MAP_SPECULATIVE, true);
1201      }
1202      
1203      /**
1204       * Turn speculative execution on or off for this job for map tasks. 
1205       * 
1206       * @param speculativeExecution <code>true</code> if speculative execution 
1207       *                             should be turned on for map tasks,
1208       *                             else <code>false</code>.
1209       */
1210      public void setMapSpeculativeExecution(boolean speculativeExecution) {
1211        setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1212      }
1213    
1214      /**
1215       * Should speculative execution be used for this job for reduce tasks? 
1216       * Defaults to <code>true</code>.
1217       * 
1218       * @return <code>true</code> if speculative execution be used 
1219       *                           for reduce tasks for this job,
1220       *         <code>false</code> otherwise.
1221       */
1222      public boolean getReduceSpeculativeExecution() { 
1223        return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1224      }
1225      
1226      /**
1227       * Turn speculative execution on or off for this job for reduce tasks. 
1228       * 
1229       * @param speculativeExecution <code>true</code> if speculative execution 
1230       *                             should be turned on for reduce tasks,
1231       *                             else <code>false</code>.
1232       */
1233      public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1234        setBoolean(JobContext.REDUCE_SPECULATIVE, 
1235                   speculativeExecution);
1236      }
1237    
1238      /**
1239       * Get configured the number of reduce tasks for this job.
1240       * Defaults to <code>1</code>.
1241       * 
1242       * @return the number of reduce tasks for this job.
1243       */
1244      public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1245      
1246      /**
1247       * Set the number of map tasks for this job.
1248       * 
1249       * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 
1250       * number of spawned map tasks depends on the number of {@link InputSplit}s 
1251       * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1252       *  
1253       * A custom {@link InputFormat} is typically used to accurately control 
1254       * the number of map tasks for the job.</p>
1255       * 
1256       * <h4 id="NoOfMaps">How many maps?</h4>
1257       * 
1258       * <p>The number of maps is usually driven by the total size of the inputs 
1259       * i.e. total number of blocks of the input files.</p>
1260       *  
1261       * <p>The right level of parallelism for maps seems to be around 10-100 maps 
1262       * per-node, although it has been set up to 300 or so for very cpu-light map 
1263       * tasks. Task setup takes awhile, so it is best if the maps take at least a 
1264       * minute to execute.</p>
1265       * 
1266       * <p>The default behavior of file-based {@link InputFormat}s is to split the 
1267       * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
1268       * bytes, of input files. However, the {@link FileSystem} blocksize of the 
1269       * input files is treated as an upper bound for input splits. A lower bound 
1270       * on the split size can be set via 
1271       * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
1272       * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1273       *  
1274       * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 
1275       * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 
1276       * used to set it even higher.</p>
1277       * 
1278       * @param n the number of map tasks for this job.
1279       * @see InputFormat#getSplits(JobConf, int)
1280       * @see FileInputFormat
1281       * @see FileSystem#getDefaultBlockSize()
1282       * @see FileStatus#getBlockSize()
1283       */
1284      public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1285    
1286      /**
1287       * Get configured the number of reduce tasks for this job. Defaults to 
1288       * <code>1</code>.
1289       * 
1290       * @return the number of reduce tasks for this job.
1291       */
1292      public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1293      
1294      /**
1295       * Set the requisite number of reduce tasks for this job.
1296       * 
1297       * <h4 id="NoOfReduces">How many reduces?</h4>
1298       * 
1299       * <p>The right number of reduces seems to be <code>0.95</code> or 
1300       * <code>1.75</code> multiplied by (&lt;<i>no. of nodes</i>&gt; * 
1301       * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
1302       * mapreduce.tasktracker.reduce.tasks.maximum</a>).
1303       * </p>
1304       * 
1305       * <p>With <code>0.95</code> all of the reduces can launch immediately and 
1306       * start transfering map outputs as the maps finish. With <code>1.75</code> 
1307       * the faster nodes will finish their first round of reduces and launch a 
1308       * second wave of reduces doing a much better job of load balancing.</p>
1309       * 
1310       * <p>Increasing the number of reduces increases the framework overhead, but 
1311       * increases load balancing and lowers the cost of failures.</p>
1312       * 
1313       * <p>The scaling factors above are slightly less than whole numbers to 
1314       * reserve a few reduce slots in the framework for speculative-tasks, failures
1315       * etc.</p> 
1316       *
1317       * <h4 id="ReducerNone">Reducer NONE</h4>
1318       * 
1319       * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1320       * 
1321       * <p>In this case the output of the map-tasks directly go to distributed 
1322       * file-system, to the path set by 
1323       * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 
1324       * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1325       * 
1326       * @param n the number of reduce tasks for this job.
1327       */
1328      public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1329      
1330      /** 
1331       * Get the configured number of maximum attempts that will be made to run a
1332       * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1333       * property. If this property is not already set, the default is 4 attempts.
1334       *  
1335       * @return the max number of attempts per map task.
1336       */
1337      public int getMaxMapAttempts() {
1338        return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1339      }
1340      
1341      /** 
1342       * Expert: Set the number of maximum attempts that will be made to run a
1343       * map task.
1344       * 
1345       * @param n the number of attempts per map task.
1346       */
1347      public void setMaxMapAttempts(int n) {
1348        setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1349      }
1350    
1351      /** 
1352       * Get the configured number of maximum attempts  that will be made to run a
1353       * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1354       * property. If this property is not already set, the default is 4 attempts.
1355       * 
1356       * @return the max number of attempts per reduce task.
1357       */
1358      public int getMaxReduceAttempts() {
1359        return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1360      }
1361      /** 
1362       * Expert: Set the number of maximum attempts that will be made to run a
1363       * reduce task.
1364       * 
1365       * @param n the number of attempts per reduce task.
1366       */
1367      public void setMaxReduceAttempts(int n) {
1368        setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1369      }
1370      
1371      /**
1372       * Get the user-specified job name. This is only used to identify the 
1373       * job to the user.
1374       * 
1375       * @return the job's name, defaulting to "".
1376       */
1377      public String getJobName() {
1378        return get(JobContext.JOB_NAME, "");
1379      }
1380      
1381      /**
1382       * Set the user-specified job name.
1383       * 
1384       * @param name the job's new name.
1385       */
1386      public void setJobName(String name) {
1387        set(JobContext.JOB_NAME, name);
1388      }
1389      
1390      /**
1391       * Get the user-specified session identifier. The default is the empty string.
1392       *
1393       * The session identifier is used to tag metric data that is reported to some
1394       * performance metrics system via the org.apache.hadoop.metrics API.  The 
1395       * session identifier is intended, in particular, for use by Hadoop-On-Demand 
1396       * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 
1397       * HOD will set the session identifier by modifying the mapred-site.xml file 
1398       * before starting the cluster.
1399       *
1400       * When not running under HOD, this identifer is expected to remain set to 
1401       * the empty string.
1402       *
1403       * @return the session identifier, defaulting to "".
1404       */
1405      @Deprecated
1406      public String getSessionId() {
1407          return get("session.id", "");
1408      }
1409      
1410      /**
1411       * Set the user-specified session identifier.  
1412       *
1413       * @param sessionId the new session id.
1414       */
1415      @Deprecated
1416      public void setSessionId(String sessionId) {
1417          set("session.id", sessionId);
1418      }
1419        
1420      /**
1421       * Set the maximum no. of failures of a given job per tasktracker.
1422       * If the no. of task failures exceeds <code>noFailures</code>, the 
1423       * tasktracker is <i>blacklisted</i> for this job. 
1424       * 
1425       * @param noFailures maximum no. of failures of a given job per tasktracker.
1426       */
1427      public void setMaxTaskFailuresPerTracker(int noFailures) {
1428        setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1429      }
1430      
1431      /**
1432       * Expert: Get the maximum no. of failures of a given job per tasktracker.
1433       * If the no. of task failures exceeds this, the tasktracker is
1434       * <i>blacklisted</i> for this job. 
1435       * 
1436       * @return the maximum no. of failures of a given job per tasktracker.
1437       */
1438      public int getMaxTaskFailuresPerTracker() {
1439        return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
1440      }
1441    
1442      /**
1443       * Get the maximum percentage of map tasks that can fail without 
1444       * the job being aborted. 
1445       * 
1446       * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 
1447       * attempts before being declared as <i>failed</i>.
1448       *  
1449       * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1450       * the job being declared as {@link JobStatus#FAILED}.
1451       * 
1452       * @return the maximum percentage of map tasks that can fail without
1453       *         the job being aborted.
1454       */
1455      public int getMaxMapTaskFailuresPercent() {
1456        return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1457      }
1458    
1459      /**
1460       * Expert: Set the maximum percentage of map tasks that can fail without the
1461       * job being aborted. 
1462       * 
1463       * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 
1464       * before being declared as <i>failed</i>.
1465       * 
1466       * @param percent the maximum percentage of map tasks that can fail without 
1467       *                the job being aborted.
1468       */
1469      public void setMaxMapTaskFailuresPercent(int percent) {
1470        setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1471      }
1472      
1473      /**
1474       * Get the maximum percentage of reduce tasks that can fail without 
1475       * the job being aborted. 
1476       * 
1477       * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1478       * attempts before being declared as <i>failed</i>.
1479       * 
1480       * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 
1481       * in the job being declared as {@link JobStatus#FAILED}.
1482       * 
1483       * @return the maximum percentage of reduce tasks that can fail without
1484       *         the job being aborted.
1485       */
1486      public int getMaxReduceTaskFailuresPercent() {
1487        return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1488      }
1489      
1490      /**
1491       * Set the maximum percentage of reduce tasks that can fail without the job
1492       * being aborted.
1493       * 
1494       * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1495       * attempts before being declared as <i>failed</i>.
1496       * 
1497       * @param percent the maximum percentage of reduce tasks that can fail without 
1498       *                the job being aborted.
1499       */
1500      public void setMaxReduceTaskFailuresPercent(int percent) {
1501        setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1502      }
1503      
1504      /**
1505       * Set {@link JobPriority} for this job.
1506       * 
1507       * @param prio the {@link JobPriority} for this job.
1508       */
1509      public void setJobPriority(JobPriority prio) {
1510        set(JobContext.PRIORITY, prio.toString());
1511      }
1512      
1513      /**
1514       * Get the {@link JobPriority} for this job.
1515       * 
1516       * @return the {@link JobPriority} for this job.
1517       */
1518      public JobPriority getJobPriority() {
1519        String prio = get(JobContext.PRIORITY);
1520        if(prio == null) {
1521          return JobPriority.NORMAL;
1522        }
1523        
1524        return JobPriority.valueOf(prio);
1525      }
1526    
1527      /**
1528       * Set JobSubmitHostName for this job.
1529       * 
1530       * @param hostname the JobSubmitHostName for this job.
1531       */
1532      void setJobSubmitHostName(String hostname) {
1533        set(MRJobConfig.JOB_SUBMITHOST, hostname);
1534      }
1535      
1536      /**
1537       * Get the  JobSubmitHostName for this job.
1538       * 
1539       * @return the JobSubmitHostName for this job.
1540       */
1541      String getJobSubmitHostName() {
1542        String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1543        
1544        return hostname;
1545      }
1546    
1547      /**
1548       * Set JobSubmitHostAddress for this job.
1549       * 
1550       * @param hostadd the JobSubmitHostAddress for this job.
1551       */
1552      void setJobSubmitHostAddress(String hostadd) {
1553        set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1554      }
1555      
1556      /**
1557       * Get JobSubmitHostAddress for this job.
1558       * 
1559       * @return  JobSubmitHostAddress for this job.
1560       */
1561      String getJobSubmitHostAddress() {
1562        String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1563        
1564        return hostadd;
1565      }
1566    
1567      /**
1568       * Get whether the task profiling is enabled.
1569       * @return true if some tasks will be profiled
1570       */
1571      public boolean getProfileEnabled() {
1572        return getBoolean(JobContext.TASK_PROFILE, false);
1573      }
1574    
1575      /**
1576       * Set whether the system should collect profiler information for some of 
1577       * the tasks in this job? The information is stored in the user log 
1578       * directory.
1579       * @param newValue true means it should be gathered
1580       */
1581      public void setProfileEnabled(boolean newValue) {
1582        setBoolean(JobContext.TASK_PROFILE, newValue);
1583      }
1584    
1585      /**
1586       * Get the profiler configuration arguments.
1587       *
1588       * The default value for this property is
1589       * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1590       * 
1591       * @return the parameters to pass to the task child to configure profiling
1592       */
1593      public String getProfileParams() {
1594        return get(JobContext.TASK_PROFILE_PARAMS,
1595                   "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," +
1596                     "verbose=n,file=%s");
1597      }
1598    
1599      /**
1600       * Set the profiler configuration arguments. If the string contains a '%s' it
1601       * will be replaced with the name of the profiling output file when the task
1602       * runs.
1603       *
1604       * This value is passed to the task child JVM on the command line.
1605       *
1606       * @param value the configuration string
1607       */
1608      public void setProfileParams(String value) {
1609        set(JobContext.TASK_PROFILE_PARAMS, value);
1610      }
1611    
1612      /**
1613       * Get the range of maps or reduces to profile.
1614       * @param isMap is the task a map?
1615       * @return the task ranges
1616       */
1617      public IntegerRanges getProfileTaskRange(boolean isMap) {
1618        return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 
1619                           JobContext.NUM_REDUCE_PROFILES), "0-2");
1620      }
1621    
1622      /**
1623       * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1624       * must also be called.
1625       * @param newValue a set of integer ranges of the map ids
1626       */
1627      public void setProfileTaskRange(boolean isMap, String newValue) {
1628        // parse the value to make sure it is legal
1629          new Configuration.IntegerRanges(newValue);
1630        set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 
1631              newValue);
1632      }
1633    
1634      /**
1635       * Set the debug script to run when the map tasks fail.
1636       * 
1637       * <p>The debug script can aid debugging of failed map tasks. The script is 
1638       * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1639       * 
1640       * <p>The debug command, run on the node where the map failed, is:</p>
1641       * <p><pre><blockquote> 
1642       * $script $stdout $stderr $syslog $jobconf.
1643       * </blockquote></pre></p>
1644       * 
1645       * <p> The script file is distributed through {@link DistributedCache} 
1646       * APIs. The script needs to be symlinked. </p>
1647       * 
1648       * <p>Here is an example on how to submit a script 
1649       * <p><blockquote><pre>
1650       * job.setMapDebugScript("./myscript");
1651       * DistributedCache.createSymlink(job);
1652       * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1653       * </pre></blockquote></p>
1654       * 
1655       * @param mDbgScript the script name
1656       */
1657      public void  setMapDebugScript(String mDbgScript) {
1658        set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1659      }
1660      
1661      /**
1662       * Get the map task's debug script.
1663       * 
1664       * @return the debug Script for the mapred job for failed map tasks.
1665       * @see #setMapDebugScript(String)
1666       */
1667      public String getMapDebugScript() {
1668        return get(JobContext.MAP_DEBUG_SCRIPT);
1669      }
1670      
1671      /**
1672       * Set the debug script to run when the reduce tasks fail.
1673       * 
1674       * <p>The debug script can aid debugging of failed reduce tasks. The script
1675       * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1676       * 
1677       * <p>The debug command, run on the node where the map failed, is:</p>
1678       * <p><pre><blockquote> 
1679       * $script $stdout $stderr $syslog $jobconf.
1680       * </blockquote></pre></p>
1681       * 
1682       * <p> The script file is distributed through {@link DistributedCache} 
1683       * APIs. The script file needs to be symlinked </p>
1684       * 
1685       * <p>Here is an example on how to submit a script 
1686       * <p><blockquote><pre>
1687       * job.setReduceDebugScript("./myscript");
1688       * DistributedCache.createSymlink(job);
1689       * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1690       * </pre></blockquote></p>
1691       * 
1692       * @param rDbgScript the script name
1693       */
1694      public void  setReduceDebugScript(String rDbgScript) {
1695        set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1696      }
1697      
1698      /**
1699       * Get the reduce task's debug Script
1700       * 
1701       * @return the debug script for the mapred job for failed reduce tasks.
1702       * @see #setReduceDebugScript(String)
1703       */
1704      public String getReduceDebugScript() {
1705        return get(JobContext.REDUCE_DEBUG_SCRIPT);
1706      }
1707    
1708      /**
1709       * Get the uri to be invoked in-order to send a notification after the job 
1710       * has completed (success/failure). 
1711       * 
1712       * @return the job end notification uri, <code>null</code> if it hasn't
1713       *         been set.
1714       * @see #setJobEndNotificationURI(String)
1715       */
1716      public String getJobEndNotificationURI() {
1717        return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1718      }
1719    
1720      /**
1721       * Set the uri to be invoked in-order to send a notification after the job
1722       * has completed (success/failure).
1723       * 
1724       * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 
1725       * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 
1726       * identifier and completion-status respectively.</p>
1727       * 
1728       * <p>This is typically used by application-writers to implement chaining of 
1729       * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1730       * 
1731       * @param uri the job end notification uri
1732       * @see JobStatus
1733       * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#
1734       *       JobCompletionAndChaining">Job Completion and Chaining</a>
1735       */
1736      public void setJobEndNotificationURI(String uri) {
1737        set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1738      }
1739    
1740      /**
1741       * Get job-specific shared directory for use as scratch space
1742       * 
1743       * <p>
1744       * When a job starts, a shared directory is created at location
1745       * <code>
1746       * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1747       * This directory is exposed to the users through 
1748       * <code>mapreduce.job.local.dir </code>.
1749       * So, the tasks can use this space 
1750       * as scratch space and share files among them. </p>
1751       * This value is available as System property also.
1752       * 
1753       * @return The localized job specific shared directory
1754       */
1755      public String getJobLocalDir() {
1756        return get(JobContext.JOB_LOCAL_DIR);
1757      }
1758    
1759      /**
1760       * Get memory required to run a map task of the job, in MB.
1761       * 
1762       * If a value is specified in the configuration, it is returned.
1763       * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1764       * <p/>
1765       * For backward compatibility, if the job configuration sets the
1766       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1767       * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1768       * after converting it from bytes to MB.
1769       * @return memory required to run a map task of the job, in MB,
1770       *          or {@link #DISABLED_MEMORY_LIMIT} if unset.
1771       */
1772      public long getMemoryForMapTask() {
1773        long value = getDeprecatedMemoryValue();
1774        if (value == DISABLED_MEMORY_LIMIT) {
1775          value = normalizeMemoryConfigValue(
1776                    getLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY,
1777                              DISABLED_MEMORY_LIMIT));
1778        }
1779        // In case that M/R 1.x applications use the old property name
1780        if (value == DISABLED_MEMORY_LIMIT) {
1781          value = normalizeMemoryConfigValue(
1782                    getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
1783                              DISABLED_MEMORY_LIMIT));
1784        }
1785        return value;
1786      }
1787    
1788      public void setMemoryForMapTask(long mem) {
1789        setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1790        // In case that M/R 1.x applications use the old property name
1791        setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1792      }
1793    
1794      /**
1795       * Get memory required to run a reduce task of the job, in MB.
1796       * 
1797       * If a value is specified in the configuration, it is returned.
1798       * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1799       * <p/>
1800       * For backward compatibility, if the job configuration sets the
1801       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1802       * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1803       * after converting it from bytes to MB.
1804       * @return memory required to run a reduce task of the job, in MB,
1805       *          or {@link #DISABLED_MEMORY_LIMIT} if unset.
1806       */
1807      public long getMemoryForReduceTask() {
1808        long value = getDeprecatedMemoryValue();
1809        if (value == DISABLED_MEMORY_LIMIT) {
1810          value = normalizeMemoryConfigValue(
1811                    getLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY,
1812                            DISABLED_MEMORY_LIMIT));
1813        }
1814        // In case that M/R 1.x applications use the old property name
1815        if (value == DISABLED_MEMORY_LIMIT) {
1816          value = normalizeMemoryConfigValue(
1817                    getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
1818                            DISABLED_MEMORY_LIMIT));
1819        }
1820        return value;
1821      }
1822      
1823      // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1824      // converted into MBs.
1825      // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1826      // value.
1827      private long getDeprecatedMemoryValue() {
1828        long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 
1829            DISABLED_MEMORY_LIMIT);
1830        oldValue = normalizeMemoryConfigValue(oldValue);
1831        if (oldValue != DISABLED_MEMORY_LIMIT) {
1832          oldValue /= (1024*1024);
1833        }
1834        return oldValue;
1835      }
1836    
1837      public void setMemoryForReduceTask(long mem) {
1838        setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1839        // In case that M/R 1.x applications use the old property name
1840        setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1841      }
1842    
1843      /**
1844       * Return the name of the queue to which this job is submitted.
1845       * Defaults to 'default'.
1846       * 
1847       * @return name of the queue
1848       */
1849      public String getQueueName() {
1850        return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1851      }
1852      
1853      /**
1854       * Set the name of the queue to which this job should be submitted.
1855       * 
1856       * @param queueName Name of the queue
1857       */
1858      public void setQueueName(String queueName) {
1859        set(JobContext.QUEUE_NAME, queueName);
1860      }
1861      
1862      /**
1863       * Normalize the negative values in configuration
1864       * 
1865       * @param val
1866       * @return normalized value
1867       */
1868      public static long normalizeMemoryConfigValue(long val) {
1869        if (val < 0) {
1870          val = DISABLED_MEMORY_LIMIT;
1871        }
1872        return val;
1873      }
1874    
1875      /**
1876       * Compute the number of slots required to run a single map task-attempt
1877       * of this job.
1878       * @param slotSizePerMap cluster-wide value of the amount of memory required
1879       *                       to run a map-task
1880       * @return the number of slots required to run a single map task-attempt
1881       *          1 if memory parameters are disabled.
1882       */
1883      int computeNumSlotsPerMap(long slotSizePerMap) {
1884        if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) ||
1885            (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) {
1886          return 1;
1887        }
1888        return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap));
1889      }
1890      
1891      /**
1892       * Compute the number of slots required to run a single reduce task-attempt
1893       * of this job.
1894       * @param slotSizePerReduce cluster-wide value of the amount of memory 
1895       *                          required to run a reduce-task
1896       * @return the number of slots required to run a single reduce task-attempt
1897       *          1 if memory parameters are disabled
1898       */
1899      int computeNumSlotsPerReduce(long slotSizePerReduce) {
1900        if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) ||
1901            (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) {
1902          return 1;
1903        }
1904        return 
1905        (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
1906      }
1907    
1908      /** 
1909       * Find a jar that contains a class of the same name, if any.
1910       * It will return a jar file, even if that is not the first thing
1911       * on the class path that has a class with the same name.
1912       * 
1913       * @param my_class the class to find.
1914       * @return a jar file that contains the class, or null.
1915       * @throws IOException
1916       */
1917      public static String findContainingJar(Class my_class) {
1918        return ClassUtil.findContainingJar(my_class);
1919      }
1920    
1921      /**
1922       * Get the memory required to run a task of this job, in bytes. See
1923       * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1924       * <p/>
1925       * This method is deprecated. Now, different memory limits can be
1926       * set for map and reduce tasks of a job, in MB. 
1927       * <p/>
1928       * For backward compatibility, if the job configuration sets the
1929       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1930       * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned. 
1931       * Otherwise, this method will return the larger of the values returned by 
1932       * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
1933       * after converting them into bytes.
1934       *
1935       * @return Memory required to run a task of this job, in bytes,
1936       *          or {@link #DISABLED_MEMORY_LIMIT}, if unset.
1937       * @see #setMaxVirtualMemoryForTask(long)
1938       * @deprecated Use {@link #getMemoryForMapTask()} and
1939       *             {@link #getMemoryForReduceTask()}
1940       */
1941      @Deprecated
1942      public long getMaxVirtualMemoryForTask() {
1943        LOG.warn(
1944          "getMaxVirtualMemoryForTask() is deprecated. " +
1945          "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
1946    
1947        long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
1948        value = normalizeMemoryConfigValue(value);
1949        if (value == DISABLED_MEMORY_LIMIT) {
1950          value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
1951          value = normalizeMemoryConfigValue(value);
1952          if (value != DISABLED_MEMORY_LIMIT) {
1953            value *= 1024*1024;
1954          }
1955        }
1956        return value;
1957      }
1958    
1959      /**
1960       * Set the maximum amount of memory any task of this job can use. See
1961       * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1962       * <p/>
1963       * mapred.task.maxvmem is split into
1964       * mapreduce.map.memory.mb
1965       * and mapreduce.map.memory.mb,mapred
1966       * each of the new key are set
1967       * as mapred.task.maxvmem / 1024
1968       * as new values are in MB
1969       *
1970       * @param vmem Maximum amount of virtual memory in bytes any task of this job
1971       *             can use.
1972       * @see #getMaxVirtualMemoryForTask()
1973       * @deprecated
1974       *  Use {@link #setMemoryForMapTask(long mem)}  and
1975       *  Use {@link #setMemoryForReduceTask(long mem)}
1976       */
1977      @Deprecated
1978      public void setMaxVirtualMemoryForTask(long vmem) {
1979        LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
1980          "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
1981        if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) {
1982          setMemoryForMapTask(DISABLED_MEMORY_LIMIT);
1983          setMemoryForReduceTask(DISABLED_MEMORY_LIMIT);
1984        }
1985    
1986        if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
1987          setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
1988          setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
1989        }else{
1990          this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
1991        }
1992      }
1993    
1994      /**
1995       * @deprecated this variable is deprecated and nolonger in use.
1996       */
1997      @Deprecated
1998      public long getMaxPhysicalMemoryForTask() {
1999        LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
2000                  + " Refer to the APIs getMemoryForMapTask() and"
2001                  + " getMemoryForReduceTask() for details.");
2002        return -1;
2003      }
2004    
2005      /*
2006       * @deprecated this
2007       */
2008      @Deprecated
2009      public void setMaxPhysicalMemoryForTask(long mem) {
2010        LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
2011            + " The value set is ignored. Refer to "
2012            + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
2013      }
2014    
2015      static String deprecatedString(String key) {
2016        return "The variable " + key + " is no longer used.";
2017      }
2018    
2019      private void checkAndWarnDeprecation() {
2020        if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
2021          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
2022                    + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY
2023                    + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY);
2024        }
2025        if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
2026          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
2027        }
2028        if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
2029          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
2030        }
2031        if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
2032          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
2033        }
2034      }
2035      
2036    
2037    }
2038