001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021
022import java.io.IOException;
023import java.util.regex.Pattern;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceAudience.Private;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.LongWritable;
035import org.apache.hadoop.io.RawComparator;
036import org.apache.hadoop.io.Text;
037import org.apache.hadoop.io.WritableComparable;
038import org.apache.hadoop.io.WritableComparator;
039import org.apache.hadoop.io.compress.CompressionCodec;
040import org.apache.hadoop.mapred.lib.HashPartitioner;
041import org.apache.hadoop.mapred.lib.IdentityMapper;
042import org.apache.hadoop.mapred.lib.IdentityReducer;
043import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
044import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
045import org.apache.hadoop.mapreduce.MRConfig;
046import org.apache.hadoop.mapreduce.MRJobConfig;
047import org.apache.hadoop.mapreduce.filecache.DistributedCache;
048import org.apache.hadoop.mapreduce.util.ConfigUtil;
049import org.apache.hadoop.security.Credentials;
050import org.apache.hadoop.util.ClassUtil;
051import org.apache.hadoop.util.ReflectionUtils;
052import org.apache.hadoop.util.Tool;
053import org.apache.log4j.Level;
054
055/** 
056 * A map/reduce job configuration.
057 * 
058 * <p><code>JobConf</code> is the primary interface for a user to describe a 
059 * map-reduce job to the Hadoop framework for execution. The framework tries to
060 * faithfully execute the job as-is described by <code>JobConf</code>, however:
061 * <ol>
062 *   <li>
063 *   Some configuration parameters might have been marked as 
064 *   <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
065 *   final</a> by administrators and hence cannot be altered.
066 *   </li>
067 *   <li>
068 *   While some job parameters are straight-forward to set 
069 *   (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly
070 *   with the rest of the framework and/or job-configuration and is relatively
071 *   more complex for the user to control finely
072 *   (e.g. {@link #setNumMapTasks(int)}).
073 *   </li>
074 * </ol>
075 * 
076 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 
077 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 
078 * {@link OutputFormat} implementations to be used etc.
079 *
080 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 
081 * of the job such as <code>Comparator</code>s to be used, files to be put in  
082 * the {@link DistributedCache}, whether or not intermediate and/or job outputs 
083 * are to be compressed (and how), debugability via user-provided scripts 
084 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
085 * for doing post-processing on task logs, task's stdout, stderr, syslog. 
086 * and etc.</p>
087 * 
088 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
089 * <p><blockquote><pre>
090 *     // Create a new JobConf
091 *     JobConf job = new JobConf(new Configuration(), MyJob.class);
092 *     
093 *     // Specify various job-specific parameters     
094 *     job.setJobName("myjob");
095 *     
096 *     FileInputFormat.setInputPaths(job, new Path("in"));
097 *     FileOutputFormat.setOutputPath(job, new Path("out"));
098 *     
099 *     job.setMapperClass(MyJob.MyMapper.class);
100 *     job.setCombinerClass(MyJob.MyReducer.class);
101 *     job.setReducerClass(MyJob.MyReducer.class);
102 *     
103 *     job.setInputFormat(SequenceFileInputFormat.class);
104 *     job.setOutputFormat(SequenceFileOutputFormat.class);
105 * </pre></blockquote>
106 * 
107 * @see JobClient
108 * @see ClusterStatus
109 * @see Tool
110 * @see DistributedCache
111 */
112@InterfaceAudience.Public
113@InterfaceStability.Stable
114public class JobConf extends Configuration {
115
116  private static final Log LOG = LogFactory.getLog(JobConf.class);
117
118  static{
119    ConfigUtil.loadResources();
120  }
121
122  /**
123   * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and
124   * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
125   */
126  @Deprecated
127  public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
128    "mapred.task.maxvmem";
129
130  /**
131   * @deprecated 
132   */
133  @Deprecated
134  public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
135    "mapred.task.limit.maxvmem";
136
137  /**
138   * @deprecated
139   */
140  @Deprecated
141  public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
142    "mapred.task.default.maxvmem";
143
144  /**
145   * @deprecated
146   */
147  @Deprecated
148  public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
149    "mapred.task.maxpmem";
150
151  /**
152   * A value which if set for memory related configuration options,
153   * indicates that the options are turned off.
154   * Deprecated because it makes no sense in the context of MR2.
155   */
156  @Deprecated
157  public static final long DISABLED_MEMORY_LIMIT = -1L;
158
159  /**
160   * Property name for the configuration property mapreduce.cluster.local.dir
161   */
162  public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
163
164  /**
165   * Name of the queue to which jobs will be submitted, if no queue
166   * name is mentioned.
167   */
168  public static final String DEFAULT_QUEUE_NAME = "default";
169
170  static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY =
171      JobContext.MAP_MEMORY_MB;
172
173  static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY =
174    JobContext.REDUCE_MEMORY_MB;
175
176  /**
177   * The variable is kept for M/R 1.x applications, while M/R 2.x applications
178   * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY}
179   */
180  @Deprecated
181  public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
182      "mapred.job.map.memory.mb";
183
184  /**
185   * The variable is kept for M/R 1.x applications, while M/R 2.x applications
186   * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
187   */
188  @Deprecated
189  public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
190      "mapred.job.reduce.memory.mb";
191
192  /** Pattern for the default unpacking behavior for job jars */
193  public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
194    Pattern.compile("(?:classes/|lib/).*");
195
196  /**
197   * Configuration key to set the java command line options for the child
198   * map and reduce tasks.
199   * 
200   * Java opts for the task tracker child processes.
201   * The following symbol, if present, will be interpolated: @taskid@. 
202   * It is replaced by current TaskID. Any other occurrences of '@' will go 
203   * unchanged.
204   * For example, to enable verbose gc logging to a file named for the taskid in
205   * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
206   *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
207   * 
208   * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 
209   * other environment variables to the child processes.
210   * 
211   * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 
212   *                 {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
213   */
214  @Deprecated
215  public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
216  
217  /**
218   * Configuration key to set the java command line options for the map tasks.
219   * 
220   * Java opts for the task tracker child map processes.
221   * The following symbol, if present, will be interpolated: @taskid@. 
222   * It is replaced by current TaskID. Any other occurrences of '@' will go 
223   * unchanged.
224   * For example, to enable verbose gc logging to a file named for the taskid in
225   * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
226   *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
227   * 
228   * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 
229   * other environment variables to the map processes.
230   */
231  public static final String MAPRED_MAP_TASK_JAVA_OPTS = 
232    JobContext.MAP_JAVA_OPTS;
233  
234  /**
235   * Configuration key to set the java command line options for the reduce tasks.
236   * 
237   * Java opts for the task tracker child reduce processes.
238   * The following symbol, if present, will be interpolated: @taskid@. 
239   * It is replaced by current TaskID. Any other occurrences of '@' will go 
240   * unchanged.
241   * For example, to enable verbose gc logging to a file named for the taskid in
242   * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
243   *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
244   * 
245   * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 
246   * pass process environment variables to the reduce processes.
247   */
248  public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
249    JobContext.REDUCE_JAVA_OPTS;
250  
251  public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
252  
253  /**
254   * @deprecated
255   * Configuration key to set the maximum virtual memory available to the child
256   * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
257   * longer have any effect.
258   */
259  @Deprecated
260  public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
261
262  /**
263   * @deprecated
264   * Configuration key to set the maximum virtual memory available to the
265   * map tasks (in kilo-bytes). This has been deprecated and will no
266   * longer have any effect.
267   */
268  @Deprecated
269  public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
270  
271  /**
272   * @deprecated
273   * Configuration key to set the maximum virtual memory available to the
274   * reduce tasks (in kilo-bytes). This has been deprecated and will no
275   * longer have any effect.
276   */
277  @Deprecated
278  public static final String MAPRED_REDUCE_TASK_ULIMIT =
279    "mapreduce.reduce.ulimit";
280
281
282  /**
283   * Configuration key to set the environment of the child map/reduce tasks.
284   * 
285   * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
286   * reference existing environment variables via <code>$key</code> on
287   * Linux or <code>%key%</code> on Windows.
288   * 
289   * Example:
290   * <ul>
291   *   <li> A=foo - This will set the env variable A to foo. </li>
292   *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
293   *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
294   * </ul>
295   * 
296   * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 
297   *                 {@link #MAPRED_REDUCE_TASK_ENV}
298   */
299  @Deprecated
300  public static final String MAPRED_TASK_ENV = "mapred.child.env";
301
302  /**
303   * Configuration key to set the environment of the child map tasks.
304   * 
305   * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
306   * reference existing environment variables via <code>$key</code> on
307   * Linux or <code>%key%</code> on Windows.
308   * 
309   * Example:
310   * <ul>
311   *   <li> A=foo - This will set the env variable A to foo. </li>
312   *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
313   *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
314   * </ul>
315   */
316  public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
317  
318  /**
319   * Configuration key to set the environment of the child reduce tasks.
320   * 
321   * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
322   * reference existing environment variables via <code>$key</code> on
323   * Linux or <code>%key%</code> on Windows.
324   * 
325   * Example:
326   * <ul>
327   *   <li> A=foo - This will set the env variable A to foo. </li>
328   *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
329   *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
330   * </ul>
331   */
332  public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
333
334  private Credentials credentials = new Credentials();
335  
336  /**
337   * Configuration key to set the logging {@link Level} for the map task.
338   *
339   * The allowed logging levels are:
340   * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
341   */
342  public static final String MAPRED_MAP_TASK_LOG_LEVEL = 
343    JobContext.MAP_LOG_LEVEL;
344  
345  /**
346   * Configuration key to set the logging {@link Level} for the reduce task.
347   *
348   * The allowed logging levels are:
349   * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
350   */
351  public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 
352    JobContext.REDUCE_LOG_LEVEL;
353  
354  /**
355   * Default logging level for map/reduce tasks.
356   */
357  public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
358
359  /**
360   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
361   * use {@link MRJobConfig#WORKFLOW_ID} instead
362   */
363  @Deprecated
364  public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID;
365
366  /**
367   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
368   * use {@link MRJobConfig#WORKFLOW_NAME} instead
369   */
370  @Deprecated
371  public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME;
372
373  /**
374   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
375   * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead
376   */
377  @Deprecated
378  public static final String WORKFLOW_NODE_NAME =
379      MRJobConfig.WORKFLOW_NODE_NAME;
380
381  /**
382   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
383   * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead
384   */
385  @Deprecated
386  public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
387      MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING;
388
389  /**
390   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
391   * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead
392   */
393  @Deprecated
394  public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
395      MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN;
396
397  /**
398   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
399   * use {@link MRJobConfig#WORKFLOW_TAGS} instead
400   */
401  @Deprecated
402  public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS;
403
404  /**
405   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
406   * not use it
407   */
408  @Deprecated
409  public static final String MAPREDUCE_RECOVER_JOB =
410      "mapreduce.job.restart.recover";
411
412  /**
413   * The variable is kept for M/R 1.x applications, M/R 2.x applications should
414   * not use it
415   */
416  @Deprecated
417  public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true;
418
419  /**
420   * Construct a map/reduce job configuration.
421   */
422  public JobConf() {
423    checkAndWarnDeprecation();
424  }
425
426  /** 
427   * Construct a map/reduce job configuration.
428   * 
429   * @param exampleClass a class whose containing jar is used as the job's jar.
430   */
431  public JobConf(Class exampleClass) {
432    setJarByClass(exampleClass);
433    checkAndWarnDeprecation();
434  }
435  
436  /**
437   * Construct a map/reduce job configuration.
438   * 
439   * @param conf a Configuration whose settings will be inherited.
440   */
441  public JobConf(Configuration conf) {
442    super(conf);
443    
444    if (conf instanceof JobConf) {
445      JobConf that = (JobConf)conf;
446      credentials = that.credentials;
447    }
448    
449    checkAndWarnDeprecation();
450  }
451
452
453  /** Construct a map/reduce job configuration.
454   * 
455   * @param conf a Configuration whose settings will be inherited.
456   * @param exampleClass a class whose containing jar is used as the job's jar.
457   */
458  public JobConf(Configuration conf, Class exampleClass) {
459    this(conf);
460    setJarByClass(exampleClass);
461  }
462
463
464  /** Construct a map/reduce configuration.
465   *
466   * @param config a Configuration-format XML job description file.
467   */
468  public JobConf(String config) {
469    this(new Path(config));
470  }
471
472  /** Construct a map/reduce configuration.
473   *
474   * @param config a Configuration-format XML job description file.
475   */
476  public JobConf(Path config) {
477    super();
478    addResource(config);
479    checkAndWarnDeprecation();
480  }
481
482  /** A new map/reduce configuration where the behavior of reading from the
483   * default resources can be turned off.
484   * <p>
485   * If the parameter {@code loadDefaults} is false, the new instance
486   * will not load resources from the default files.
487   *
488   * @param loadDefaults specifies whether to load from the default files
489   */
490  public JobConf(boolean loadDefaults) {
491    super(loadDefaults);
492    checkAndWarnDeprecation();
493  }
494
495  /**
496   * Get credentials for the job.
497   * @return credentials for the job
498   */
499  public Credentials getCredentials() {
500    return credentials;
501  }
502  
503  @Private
504  public void setCredentials(Credentials credentials) {
505    this.credentials = credentials;
506  }
507  
508  /**
509   * Get the user jar for the map-reduce job.
510   * 
511   * @return the user jar for the map-reduce job.
512   */
513  public String getJar() { return get(JobContext.JAR); }
514  
515  /**
516   * Set the user jar for the map-reduce job.
517   * 
518   * @param jar the user jar for the map-reduce job.
519   */
520  public void setJar(String jar) { set(JobContext.JAR, jar); }
521
522  /**
523   * Get the pattern for jar contents to unpack on the tasktracker
524   */
525  public Pattern getJarUnpackPattern() {
526    return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
527  }
528
529  
530  /**
531   * Set the job's jar file by finding an example class location.
532   * 
533   * @param cls the example class.
534   */
535  public void setJarByClass(Class cls) {
536    String jar = ClassUtil.findContainingJar(cls);
537    if (jar != null) {
538      setJar(jar);
539    }   
540  }
541
542  public String[] getLocalDirs() throws IOException {
543    return getTrimmedStrings(MRConfig.LOCAL_DIR);
544  }
545
546  /**
547   * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
548   */
549  @Deprecated
550  public void deleteLocalFiles() throws IOException {
551    String[] localDirs = getLocalDirs();
552    for (int i = 0; i < localDirs.length; i++) {
553      FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
554    }
555  }
556
557  public void deleteLocalFiles(String subdir) throws IOException {
558    String[] localDirs = getLocalDirs();
559    for (int i = 0; i < localDirs.length; i++) {
560      FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
561    }
562  }
563
564  /** 
565   * Constructs a local file name. Files are distributed among configured
566   * local directories.
567   */
568  public Path getLocalPath(String pathString) throws IOException {
569    return getLocalPath(MRConfig.LOCAL_DIR, pathString);
570  }
571
572  /**
573   * Get the reported username for this job.
574   * 
575   * @return the username
576   */
577  public String getUser() {
578    return get(JobContext.USER_NAME);
579  }
580  
581  /**
582   * Set the reported username for this job.
583   * 
584   * @param user the username for this job.
585   */
586  public void setUser(String user) {
587    set(JobContext.USER_NAME, user);
588  }
589
590
591  
592  /**
593   * Set whether the framework should keep the intermediate files for 
594   * failed tasks.
595   * 
596   * @param keep <code>true</code> if framework should keep the intermediate files 
597   *             for failed tasks, <code>false</code> otherwise.
598   * 
599   */
600  public void setKeepFailedTaskFiles(boolean keep) {
601    setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
602  }
603  
604  /**
605   * Should the temporary files for failed tasks be kept?
606   * 
607   * @return should the files be kept?
608   */
609  public boolean getKeepFailedTaskFiles() {
610    return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
611  }
612  
613  /**
614   * Set a regular expression for task names that should be kept. 
615   * The regular expression ".*_m_000123_0" would keep the files
616   * for the first instance of map 123 that ran.
617   * 
618   * @param pattern the java.util.regex.Pattern to match against the 
619   *        task names.
620   */
621  public void setKeepTaskFilesPattern(String pattern) {
622    set(JobContext.PRESERVE_FILES_PATTERN, pattern);
623  }
624  
625  /**
626   * Get the regular expression that is matched against the task names
627   * to see if we need to keep the files.
628   * 
629   * @return the pattern as a string, if it was set, othewise null.
630   */
631  public String getKeepTaskFilesPattern() {
632    return get(JobContext.PRESERVE_FILES_PATTERN);
633  }
634  
635  /**
636   * Set the current working directory for the default file system.
637   * 
638   * @param dir the new current working directory.
639   */
640  public void setWorkingDirectory(Path dir) {
641    dir = new Path(getWorkingDirectory(), dir);
642    set(JobContext.WORKING_DIR, dir.toString());
643  }
644  
645  /**
646   * Get the current working directory for the default file system.
647   * 
648   * @return the directory name.
649   */
650  public Path getWorkingDirectory() {
651    String name = get(JobContext.WORKING_DIR);
652    if (name != null) {
653      return new Path(name);
654    } else {
655      try {
656        Path dir = FileSystem.get(this).getWorkingDirectory();
657        set(JobContext.WORKING_DIR, dir.toString());
658        return dir;
659      } catch (IOException e) {
660        throw new RuntimeException(e);
661      }
662    }
663  }
664  
665  /**
666   * Sets the number of tasks that a spawned task JVM should run
667   * before it exits
668   * @param numTasks the number of tasks to execute; defaults to 1;
669   * -1 signifies no limit
670   */
671  public void setNumTasksToExecutePerJvm(int numTasks) {
672    setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
673  }
674  
675  /**
676   * Get the number of tasks that a spawned JVM should execute
677   */
678  public int getNumTasksToExecutePerJvm() {
679    return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
680  }
681  
682  /**
683   * Get the {@link InputFormat} implementation for the map-reduce job,
684   * defaults to {@link TextInputFormat} if not specified explicity.
685   * 
686   * @return the {@link InputFormat} implementation for the map-reduce job.
687   */
688  public InputFormat getInputFormat() {
689    return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
690                                                             TextInputFormat.class,
691                                                             InputFormat.class),
692                                                    this);
693  }
694  
695  /**
696   * Set the {@link InputFormat} implementation for the map-reduce job.
697   * 
698   * @param theClass the {@link InputFormat} implementation for the map-reduce 
699   *                 job.
700   */
701  public void setInputFormat(Class<? extends InputFormat> theClass) {
702    setClass("mapred.input.format.class", theClass, InputFormat.class);
703  }
704  
705  /**
706   * Get the {@link OutputFormat} implementation for the map-reduce job,
707   * defaults to {@link TextOutputFormat} if not specified explicity.
708   * 
709   * @return the {@link OutputFormat} implementation for the map-reduce job.
710   */
711  public OutputFormat getOutputFormat() {
712    return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
713                                                              TextOutputFormat.class,
714                                                              OutputFormat.class),
715                                                     this);
716  }
717
718  /**
719   * Get the {@link OutputCommitter} implementation for the map-reduce job,
720   * defaults to {@link FileOutputCommitter} if not specified explicitly.
721   * 
722   * @return the {@link OutputCommitter} implementation for the map-reduce job.
723   */
724  public OutputCommitter getOutputCommitter() {
725    return (OutputCommitter)ReflectionUtils.newInstance(
726      getClass("mapred.output.committer.class", FileOutputCommitter.class,
727               OutputCommitter.class), this);
728  }
729
730  /**
731   * Set the {@link OutputCommitter} implementation for the map-reduce job.
732   * 
733   * @param theClass the {@link OutputCommitter} implementation for the map-reduce 
734   *                 job.
735   */
736  public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
737    setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
738  }
739  
740  /**
741   * Set the {@link OutputFormat} implementation for the map-reduce job.
742   * 
743   * @param theClass the {@link OutputFormat} implementation for the map-reduce 
744   *                 job.
745   */
746  public void setOutputFormat(Class<? extends OutputFormat> theClass) {
747    setClass("mapred.output.format.class", theClass, OutputFormat.class);
748  }
749
750  /**
751   * Should the map outputs be compressed before transfer?
752   * 
753   * @param compress should the map outputs be compressed?
754   */
755  public void setCompressMapOutput(boolean compress) {
756    setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
757  }
758  
759  /**
760   * Are the outputs of the maps be compressed?
761   * 
762   * @return <code>true</code> if the outputs of the maps are to be compressed,
763   *         <code>false</code> otherwise.
764   */
765  public boolean getCompressMapOutput() {
766    return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
767  }
768
769  /**
770   * Set the given class as the  {@link CompressionCodec} for the map outputs.
771   * 
772   * @param codecClass the {@link CompressionCodec} class that will compress  
773   *                   the map outputs.
774   */
775  public void 
776  setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
777    setCompressMapOutput(true);
778    setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 
779             CompressionCodec.class);
780  }
781  
782  /**
783   * Get the {@link CompressionCodec} for compressing the map outputs.
784   * 
785   * @param defaultValue the {@link CompressionCodec} to return if not set
786   * @return the {@link CompressionCodec} class that should be used to compress the 
787   *         map outputs.
788   * @throws IllegalArgumentException if the class was specified, but not found
789   */
790  public Class<? extends CompressionCodec> 
791  getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
792    Class<? extends CompressionCodec> codecClass = defaultValue;
793    String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
794    if (name != null) {
795      try {
796        codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
797      } catch (ClassNotFoundException e) {
798        throw new IllegalArgumentException("Compression codec " + name + 
799                                           " was not found.", e);
800      }
801    }
802    return codecClass;
803  }
804  
805  /**
806   * Get the key class for the map output data. If it is not set, use the
807   * (final) output key class. This allows the map output key class to be
808   * different than the final output key class.
809   *  
810   * @return the map output key class.
811   */
812  public Class<?> getMapOutputKeyClass() {
813    Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
814    if (retv == null) {
815      retv = getOutputKeyClass();
816    }
817    return retv;
818  }
819  
820  /**
821   * Set the key class for the map output data. This allows the user to
822   * specify the map output key class to be different than the final output
823   * value class.
824   * 
825   * @param theClass the map output key class.
826   */
827  public void setMapOutputKeyClass(Class<?> theClass) {
828    setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
829  }
830  
831  /**
832   * Get the value class for the map output data. If it is not set, use the
833   * (final) output value class This allows the map output value class to be
834   * different than the final output value class.
835   *  
836   * @return the map output value class.
837   */
838  public Class<?> getMapOutputValueClass() {
839    Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
840        Object.class);
841    if (retv == null) {
842      retv = getOutputValueClass();
843    }
844    return retv;
845  }
846  
847  /**
848   * Set the value class for the map output data. This allows the user to
849   * specify the map output value class to be different than the final output
850   * value class.
851   * 
852   * @param theClass the map output value class.
853   */
854  public void setMapOutputValueClass(Class<?> theClass) {
855    setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
856  }
857  
858  /**
859   * Get the key class for the job output data.
860   * 
861   * @return the key class for the job output data.
862   */
863  public Class<?> getOutputKeyClass() {
864    return getClass(JobContext.OUTPUT_KEY_CLASS,
865                    LongWritable.class, Object.class);
866  }
867  
868  /**
869   * Set the key class for the job output data.
870   * 
871   * @param theClass the key class for the job output data.
872   */
873  public void setOutputKeyClass(Class<?> theClass) {
874    setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
875  }
876
877  /**
878   * Get the {@link RawComparator} comparator used to compare keys.
879   * 
880   * @return the {@link RawComparator} comparator used to compare keys.
881   */
882  public RawComparator getOutputKeyComparator() {
883    Class<? extends RawComparator> theClass = getClass(
884      JobContext.KEY_COMPARATOR, null, RawComparator.class);
885    if (theClass != null)
886      return ReflectionUtils.newInstance(theClass, this);
887    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
888  }
889
890  /**
891   * Set the {@link RawComparator} comparator used to compare keys.
892   * 
893   * @param theClass the {@link RawComparator} comparator used to 
894   *                 compare keys.
895   * @see #setOutputValueGroupingComparator(Class)                 
896   */
897  public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
898    setClass(JobContext.KEY_COMPARATOR,
899             theClass, RawComparator.class);
900  }
901
902  /**
903   * Set the {@link KeyFieldBasedComparator} options used to compare keys.
904   * 
905   * @param keySpec the key specification of the form -k pos1[,pos2], where,
906   *  pos is of the form f[.c][opts], where f is the number
907   *  of the key field to use, and c is the number of the first character from
908   *  the beginning of the field. Fields and character posns are numbered 
909   *  starting with 1; a character position of zero in pos2 indicates the
910   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
911   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
912   *  (the end of the field). opts are ordering options. The supported options
913   *  are:
914   *    -n, (Sort numerically)
915   *    -r, (Reverse the result of comparison)                 
916   */
917  public void setKeyFieldComparatorOptions(String keySpec) {
918    setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
919    set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
920  }
921  
922  /**
923   * Get the {@link KeyFieldBasedComparator} options
924   */
925  public String getKeyFieldComparatorOption() {
926    return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
927  }
928
929  /**
930   * Set the {@link KeyFieldBasedPartitioner} options used for 
931   * {@link Partitioner}
932   * 
933   * @param keySpec the key specification of the form -k pos1[,pos2], where,
934   *  pos is of the form f[.c][opts], where f is the number
935   *  of the key field to use, and c is the number of the first character from
936   *  the beginning of the field. Fields and character posns are numbered 
937   *  starting with 1; a character position of zero in pos2 indicates the
938   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
939   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
940   *  (the end of the field).
941   */
942  public void setKeyFieldPartitionerOptions(String keySpec) {
943    setPartitionerClass(KeyFieldBasedPartitioner.class);
944    set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
945  }
946  
947  /**
948   * Get the {@link KeyFieldBasedPartitioner} options
949   */
950  public String getKeyFieldPartitionerOption() {
951    return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
952  }
953
954  /**
955   * Get the user defined {@link WritableComparable} comparator for
956   * grouping keys of inputs to the combiner.
957   *
958   * @return comparator set by the user for grouping values.
959   * @see #setCombinerKeyGroupingComparator(Class) for details.
960   */
961  public RawComparator getCombinerKeyGroupingComparator() {
962    Class<? extends RawComparator> theClass = getClass(
963        JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
964    if (theClass == null) {
965      return getOutputKeyComparator();
966    }
967
968    return ReflectionUtils.newInstance(theClass, this);
969  }
970
971  /** 
972   * Get the user defined {@link WritableComparable} comparator for 
973   * grouping keys of inputs to the reduce.
974   * 
975   * @return comparator set by the user for grouping values.
976   * @see #setOutputValueGroupingComparator(Class) for details.
977   */
978  public RawComparator getOutputValueGroupingComparator() {
979    Class<? extends RawComparator> theClass = getClass(
980      JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
981    if (theClass == null) {
982      return getOutputKeyComparator();
983    }
984    
985    return ReflectionUtils.newInstance(theClass, this);
986  }
987
988  /**
989   * Set the user defined {@link RawComparator} comparator for
990   * grouping keys in the input to the combiner.
991   *
992   * <p>This comparator should be provided if the equivalence rules for keys
993   * for sorting the intermediates are different from those for grouping keys
994   * before each call to
995   * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
996   *
997   * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
998   * in a single call to the reduce function if K1 and K2 compare as equal.</p>
999   *
1000   * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
1001   * how keys are sorted, this can be used in conjunction to simulate
1002   * <i>secondary sort on values</i>.</p>
1003   *
1004   * <p><i>Note</i>: This is not a guarantee of the combiner sort being
1005   * <i>stable</i> in any sense. (In any case, with the order of available
1006   * map-outputs to the combiner being non-deterministic, it wouldn't make
1007   * that much sense.)</p>
1008   *
1009   * @param theClass the comparator class to be used for grouping keys for the
1010   * combiner. It should implement <code>RawComparator</code>.
1011   * @see #setOutputKeyComparatorClass(Class)
1012   */
1013  public void setCombinerKeyGroupingComparator(
1014      Class<? extends RawComparator> theClass) {
1015    setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
1016        theClass, RawComparator.class);
1017  }
1018
1019  /** 
1020   * Set the user defined {@link RawComparator} comparator for 
1021   * grouping keys in the input to the reduce.
1022   * 
1023   * <p>This comparator should be provided if the equivalence rules for keys
1024   * for sorting the intermediates are different from those for grouping keys
1025   * before each call to 
1026   * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
1027   *  
1028   * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
1029   * in a single call to the reduce function if K1 and K2 compare as equal.</p>
1030   * 
1031   * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 
1032   * how keys are sorted, this can be used in conjunction to simulate 
1033   * <i>secondary sort on values</i>.</p>
1034   *  
1035   * <p><i>Note</i>: This is not a guarantee of the reduce sort being 
1036   * <i>stable</i> in any sense. (In any case, with the order of available 
1037   * map-outputs to the reduce being non-deterministic, it wouldn't make 
1038   * that much sense.)</p>
1039   * 
1040   * @param theClass the comparator class to be used for grouping keys. 
1041   *                 It should implement <code>RawComparator</code>.
1042   * @see #setOutputKeyComparatorClass(Class)
1043   * @see #setCombinerKeyGroupingComparator(Class)
1044   */
1045  public void setOutputValueGroupingComparator(
1046      Class<? extends RawComparator> theClass) {
1047    setClass(JobContext.GROUP_COMPARATOR_CLASS,
1048             theClass, RawComparator.class);
1049  }
1050
1051  /**
1052   * Should the framework use the new context-object code for running
1053   * the mapper?
1054   * @return true, if the new api should be used
1055   */
1056  public boolean getUseNewMapper() {
1057    return getBoolean("mapred.mapper.new-api", false);
1058  }
1059  /**
1060   * Set whether the framework should use the new api for the mapper.
1061   * This is the default for jobs submitted with the new Job api.
1062   * @param flag true, if the new api should be used
1063   */
1064  public void setUseNewMapper(boolean flag) {
1065    setBoolean("mapred.mapper.new-api", flag);
1066  }
1067
1068  /**
1069   * Should the framework use the new context-object code for running
1070   * the reducer?
1071   * @return true, if the new api should be used
1072   */
1073  public boolean getUseNewReducer() {
1074    return getBoolean("mapred.reducer.new-api", false);
1075  }
1076  /**
1077   * Set whether the framework should use the new api for the reducer. 
1078   * This is the default for jobs submitted with the new Job api.
1079   * @param flag true, if the new api should be used
1080   */
1081  public void setUseNewReducer(boolean flag) {
1082    setBoolean("mapred.reducer.new-api", flag);
1083  }
1084
1085  /**
1086   * Get the value class for job outputs.
1087   * 
1088   * @return the value class for job outputs.
1089   */
1090  public Class<?> getOutputValueClass() {
1091    return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
1092  }
1093  
1094  /**
1095   * Set the value class for job outputs.
1096   * 
1097   * @param theClass the value class for job outputs.
1098   */
1099  public void setOutputValueClass(Class<?> theClass) {
1100    setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
1101  }
1102
1103  /**
1104   * Get the {@link Mapper} class for the job.
1105   * 
1106   * @return the {@link Mapper} class for the job.
1107   */
1108  public Class<? extends Mapper> getMapperClass() {
1109    return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
1110  }
1111  
1112  /**
1113   * Set the {@link Mapper} class for the job.
1114   * 
1115   * @param theClass the {@link Mapper} class for the job.
1116   */
1117  public void setMapperClass(Class<? extends Mapper> theClass) {
1118    setClass("mapred.mapper.class", theClass, Mapper.class);
1119  }
1120
1121  /**
1122   * Get the {@link MapRunnable} class for the job.
1123   * 
1124   * @return the {@link MapRunnable} class for the job.
1125   */
1126  public Class<? extends MapRunnable> getMapRunnerClass() {
1127    return getClass("mapred.map.runner.class",
1128                    MapRunner.class, MapRunnable.class);
1129  }
1130  
1131  /**
1132   * Expert: Set the {@link MapRunnable} class for the job.
1133   * 
1134   * Typically used to exert greater control on {@link Mapper}s.
1135   * 
1136   * @param theClass the {@link MapRunnable} class for the job.
1137   */
1138  public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1139    setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1140  }
1141
1142  /**
1143   * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 
1144   * to be sent to the {@link Reducer}s.
1145   * 
1146   * @return the {@link Partitioner} used to partition map-outputs.
1147   */
1148  public Class<? extends Partitioner> getPartitionerClass() {
1149    return getClass("mapred.partitioner.class",
1150                    HashPartitioner.class, Partitioner.class);
1151  }
1152  
1153  /**
1154   * Set the {@link Partitioner} class used to partition 
1155   * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1156   * 
1157   * @param theClass the {@link Partitioner} used to partition map-outputs.
1158   */
1159  public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1160    setClass("mapred.partitioner.class", theClass, Partitioner.class);
1161  }
1162
1163  /**
1164   * Get the {@link Reducer} class for the job.
1165   * 
1166   * @return the {@link Reducer} class for the job.
1167   */
1168  public Class<? extends Reducer> getReducerClass() {
1169    return getClass("mapred.reducer.class",
1170                    IdentityReducer.class, Reducer.class);
1171  }
1172  
1173  /**
1174   * Set the {@link Reducer} class for the job.
1175   * 
1176   * @param theClass the {@link Reducer} class for the job.
1177   */
1178  public void setReducerClass(Class<? extends Reducer> theClass) {
1179    setClass("mapred.reducer.class", theClass, Reducer.class);
1180  }
1181
1182  /**
1183   * Get the user-defined <i>combiner</i> class used to combine map-outputs 
1184   * before being sent to the reducers. Typically the combiner is same as the
1185   * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1186   * 
1187   * @return the user-defined combiner class used to combine map-outputs.
1188   */
1189  public Class<? extends Reducer> getCombinerClass() {
1190    return getClass("mapred.combiner.class", null, Reducer.class);
1191  }
1192
1193  /**
1194   * Set the user-defined <i>combiner</i> class used to combine map-outputs 
1195   * before being sent to the reducers. 
1196   * 
1197   * <p>The combiner is an application-specified aggregation operation, which
1198   * can help cut down the amount of data transferred between the 
1199   * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1200   * 
1201   * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1202   * the mapper and reducer tasks. In general, the combiner is called as the
1203   * sort/merge result is written to disk. The combiner must:
1204   * <ul>
1205   *   <li> be side-effect free</li>
1206   *   <li> have the same input and output key types and the same input and 
1207   *        output value types</li>
1208   * </ul>
1209   * 
1210   * <p>Typically the combiner is same as the <code>Reducer</code> for the  
1211   * job i.e. {@link #setReducerClass(Class)}.</p>
1212   * 
1213   * @param theClass the user-defined combiner class used to combine 
1214   *                 map-outputs.
1215   */
1216  public void setCombinerClass(Class<? extends Reducer> theClass) {
1217    setClass("mapred.combiner.class", theClass, Reducer.class);
1218  }
1219  
1220  /**
1221   * Should speculative execution be used for this job? 
1222   * Defaults to <code>true</code>.
1223   * 
1224   * @return <code>true</code> if speculative execution be used for this job,
1225   *         <code>false</code> otherwise.
1226   */
1227  public boolean getSpeculativeExecution() { 
1228    return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1229  }
1230  
1231  /**
1232   * Turn speculative execution on or off for this job. 
1233   * 
1234   * @param speculativeExecution <code>true</code> if speculative execution 
1235   *                             should be turned on, else <code>false</code>.
1236   */
1237  public void setSpeculativeExecution(boolean speculativeExecution) {
1238    setMapSpeculativeExecution(speculativeExecution);
1239    setReduceSpeculativeExecution(speculativeExecution);
1240  }
1241
1242  /**
1243   * Should speculative execution be used for this job for map tasks? 
1244   * Defaults to <code>true</code>.
1245   * 
1246   * @return <code>true</code> if speculative execution be 
1247   *                           used for this job for map tasks,
1248   *         <code>false</code> otherwise.
1249   */
1250  public boolean getMapSpeculativeExecution() { 
1251    return getBoolean(JobContext.MAP_SPECULATIVE, true);
1252  }
1253  
1254  /**
1255   * Turn speculative execution on or off for this job for map tasks. 
1256   * 
1257   * @param speculativeExecution <code>true</code> if speculative execution 
1258   *                             should be turned on for map tasks,
1259   *                             else <code>false</code>.
1260   */
1261  public void setMapSpeculativeExecution(boolean speculativeExecution) {
1262    setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1263  }
1264
1265  /**
1266   * Should speculative execution be used for this job for reduce tasks? 
1267   * Defaults to <code>true</code>.
1268   * 
1269   * @return <code>true</code> if speculative execution be used 
1270   *                           for reduce tasks for this job,
1271   *         <code>false</code> otherwise.
1272   */
1273  public boolean getReduceSpeculativeExecution() { 
1274    return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1275  }
1276  
1277  /**
1278   * Turn speculative execution on or off for this job for reduce tasks. 
1279   * 
1280   * @param speculativeExecution <code>true</code> if speculative execution 
1281   *                             should be turned on for reduce tasks,
1282   *                             else <code>false</code>.
1283   */
1284  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1285    setBoolean(JobContext.REDUCE_SPECULATIVE, 
1286               speculativeExecution);
1287  }
1288
1289  /**
1290   * Get configured the number of reduce tasks for this job.
1291   * Defaults to <code>1</code>.
1292   * 
1293   * @return the number of reduce tasks for this job.
1294   */
1295  public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1296  
1297  /**
1298   * Set the number of map tasks for this job.
1299   * 
1300   * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 
1301   * number of spawned map tasks depends on the number of {@link InputSplit}s 
1302   * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1303   *  
1304   * A custom {@link InputFormat} is typically used to accurately control 
1305   * the number of map tasks for the job.</p>
1306   * 
1307   * <b id="NoOfMaps">How many maps?</b>
1308   * 
1309   * <p>The number of maps is usually driven by the total size of the inputs 
1310   * i.e. total number of blocks of the input files.</p>
1311   *  
1312   * <p>The right level of parallelism for maps seems to be around 10-100 maps 
1313   * per-node, although it has been set up to 300 or so for very cpu-light map 
1314   * tasks. Task setup takes awhile, so it is best if the maps take at least a 
1315   * minute to execute.</p>
1316   * 
1317   * <p>The default behavior of file-based {@link InputFormat}s is to split the 
1318   * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
1319   * bytes, of input files. However, the {@link FileSystem} blocksize of the 
1320   * input files is treated as an upper bound for input splits. A lower bound 
1321   * on the split size can be set via 
1322   * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
1323   * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1324   *  
1325   * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 
1326   * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 
1327   * used to set it even higher.</p>
1328   * 
1329   * @param n the number of map tasks for this job.
1330   * @see InputFormat#getSplits(JobConf, int)
1331   * @see FileInputFormat
1332   * @see FileSystem#getDefaultBlockSize()
1333   * @see FileStatus#getBlockSize()
1334   */
1335  public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1336
1337  /**
1338   * Get configured the number of reduce tasks for this job. Defaults to 
1339   * <code>1</code>.
1340   * 
1341   * @return the number of reduce tasks for this job.
1342   */
1343  public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1344  
1345  /**
1346   * Set the requisite number of reduce tasks for this job.
1347   * 
1348   * <b id="NoOfReduces">How many reduces?</b>
1349   * 
1350   * <p>The right number of reduces seems to be <code>0.95</code> or 
1351   * <code>1.75</code> multiplied by (
1352   * <i>available memory for reduce tasks</i>
1353   * (The value of this should be smaller than
1354   * numNodes * yarn.nodemanager.resource.memory-mb
1355   * since the resource of memory is shared by map tasks and other
1356   * applications) /
1357   * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.reduce.memory.mb">
1358   * mapreduce.reduce.memory.mb</a>).
1359   * </p>
1360   * 
1361   * <p>With <code>0.95</code> all of the reduces can launch immediately and 
1362   * start transfering map outputs as the maps finish. With <code>1.75</code> 
1363   * the faster nodes will finish their first round of reduces and launch a 
1364   * second wave of reduces doing a much better job of load balancing.</p>
1365   * 
1366   * <p>Increasing the number of reduces increases the framework overhead, but 
1367   * increases load balancing and lowers the cost of failures.</p>
1368   * 
1369   * <p>The scaling factors above are slightly less than whole numbers to 
1370   * reserve a few reduce slots in the framework for speculative-tasks, failures
1371   * etc.</p> 
1372   *
1373   * <b id="ReducerNone">Reducer NONE</b>
1374   * 
1375   * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1376   * 
1377   * <p>In this case the output of the map-tasks directly go to distributed 
1378   * file-system, to the path set by 
1379   * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 
1380   * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1381   * 
1382   * @param n the number of reduce tasks for this job.
1383   */
1384  public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1385  
1386  /** 
1387   * Get the configured number of maximum attempts that will be made to run a
1388   * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1389   * property. If this property is not already set, the default is 4 attempts.
1390   *  
1391   * @return the max number of attempts per map task.
1392   */
1393  public int getMaxMapAttempts() {
1394    return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1395  }
1396  
1397  /** 
1398   * Expert: Set the number of maximum attempts that will be made to run a
1399   * map task.
1400   * 
1401   * @param n the number of attempts per map task.
1402   */
1403  public void setMaxMapAttempts(int n) {
1404    setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1405  }
1406
1407  /** 
1408   * Get the configured number of maximum attempts  that will be made to run a
1409   * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1410   * property. If this property is not already set, the default is 4 attempts.
1411   * 
1412   * @return the max number of attempts per reduce task.
1413   */
1414  public int getMaxReduceAttempts() {
1415    return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1416  }
1417  /** 
1418   * Expert: Set the number of maximum attempts that will be made to run a
1419   * reduce task.
1420   * 
1421   * @param n the number of attempts per reduce task.
1422   */
1423  public void setMaxReduceAttempts(int n) {
1424    setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1425  }
1426  
1427  /**
1428   * Get the user-specified job name. This is only used to identify the 
1429   * job to the user.
1430   * 
1431   * @return the job's name, defaulting to "".
1432   */
1433  public String getJobName() {
1434    return get(JobContext.JOB_NAME, "");
1435  }
1436  
1437  /**
1438   * Set the user-specified job name.
1439   * 
1440   * @param name the job's new name.
1441   */
1442  public void setJobName(String name) {
1443    set(JobContext.JOB_NAME, name);
1444  }
1445  
1446  /**
1447   * Get the user-specified session identifier. The default is the empty string.
1448   *
1449   * The session identifier is used to tag metric data that is reported to some
1450   * performance metrics system via the org.apache.hadoop.metrics API.  The 
1451   * session identifier is intended, in particular, for use by Hadoop-On-Demand 
1452   * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 
1453   * HOD will set the session identifier by modifying the mapred-site.xml file 
1454   * before starting the cluster.
1455   *
1456   * When not running under HOD, this identifer is expected to remain set to 
1457   * the empty string.
1458   *
1459   * @return the session identifier, defaulting to "".
1460   */
1461  @Deprecated
1462  public String getSessionId() {
1463      return get("session.id", "");
1464  }
1465  
1466  /**
1467   * Set the user-specified session identifier.  
1468   *
1469   * @param sessionId the new session id.
1470   */
1471  @Deprecated
1472  public void setSessionId(String sessionId) {
1473      set("session.id", sessionId);
1474  }
1475    
1476  /**
1477   * Set the maximum no. of failures of a given job per tasktracker.
1478   * If the no. of task failures exceeds <code>noFailures</code>, the 
1479   * tasktracker is <i>blacklisted</i> for this job. 
1480   * 
1481   * @param noFailures maximum no. of failures of a given job per tasktracker.
1482   */
1483  public void setMaxTaskFailuresPerTracker(int noFailures) {
1484    setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1485  }
1486  
1487  /**
1488   * Expert: Get the maximum no. of failures of a given job per tasktracker.
1489   * If the no. of task failures exceeds this, the tasktracker is
1490   * <i>blacklisted</i> for this job. 
1491   * 
1492   * @return the maximum no. of failures of a given job per tasktracker.
1493   */
1494  public int getMaxTaskFailuresPerTracker() {
1495    return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
1496  }
1497
1498  /**
1499   * Get the maximum percentage of map tasks that can fail without 
1500   * the job being aborted. 
1501   * 
1502   * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 
1503   * attempts before being declared as <i>failed</i>.
1504   *  
1505   * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1506   * the job being declared as {@link JobStatus#FAILED}.
1507   * 
1508   * @return the maximum percentage of map tasks that can fail without
1509   *         the job being aborted.
1510   */
1511  public int getMaxMapTaskFailuresPercent() {
1512    return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1513  }
1514
1515  /**
1516   * Expert: Set the maximum percentage of map tasks that can fail without the
1517   * job being aborted. 
1518   * 
1519   * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 
1520   * before being declared as <i>failed</i>.
1521   * 
1522   * @param percent the maximum percentage of map tasks that can fail without 
1523   *                the job being aborted.
1524   */
1525  public void setMaxMapTaskFailuresPercent(int percent) {
1526    setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1527  }
1528  
1529  /**
1530   * Get the maximum percentage of reduce tasks that can fail without 
1531   * the job being aborted. 
1532   * 
1533   * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1534   * attempts before being declared as <i>failed</i>.
1535   * 
1536   * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 
1537   * in the job being declared as {@link JobStatus#FAILED}.
1538   * 
1539   * @return the maximum percentage of reduce tasks that can fail without
1540   *         the job being aborted.
1541   */
1542  public int getMaxReduceTaskFailuresPercent() {
1543    return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1544  }
1545  
1546  /**
1547   * Set the maximum percentage of reduce tasks that can fail without the job
1548   * being aborted.
1549   * 
1550   * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1551   * attempts before being declared as <i>failed</i>.
1552   * 
1553   * @param percent the maximum percentage of reduce tasks that can fail without 
1554   *                the job being aborted.
1555   */
1556  public void setMaxReduceTaskFailuresPercent(int percent) {
1557    setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1558  }
1559  
1560  /**
1561   * Set {@link JobPriority} for this job.
1562   *
1563   * @param prio the {@link JobPriority} for this job.
1564   */
1565  public void setJobPriority(JobPriority prio) {
1566    set(JobContext.PRIORITY, prio.toString());
1567  }
1568
1569  /**
1570   * Set {@link JobPriority} for this job.
1571   *
1572   * @param prio the {@link JobPriority} for this job.
1573   */
1574  public void setJobPriorityAsInteger(int prio) {
1575    set(JobContext.PRIORITY, Integer.toString(prio));
1576  }
1577
1578  /**
1579   * Get the {@link JobPriority} for this job.
1580   *
1581   * @return the {@link JobPriority} for this job.
1582   */
1583  public JobPriority getJobPriority() {
1584    String prio = get(JobContext.PRIORITY);
1585    if (prio == null) {
1586      return JobPriority.DEFAULT;
1587    }
1588
1589    JobPriority priority = JobPriority.DEFAULT;
1590    try {
1591      priority = JobPriority.valueOf(prio);
1592    } catch (IllegalArgumentException e) {
1593      return convertToJobPriority(Integer.parseInt(prio));
1594    }
1595    return priority;
1596  }
1597
1598  /**
1599   * Get the priority for this job.
1600   *
1601   * @return the priority for this job.
1602   */
1603  public int getJobPriorityAsInteger() {
1604    String priority = get(JobContext.PRIORITY);
1605    if (priority == null) {
1606      return 0;
1607    }
1608
1609    int jobPriority = 0;
1610    try {
1611      jobPriority = convertPriorityToInteger(priority);
1612    } catch (IllegalArgumentException e) {
1613      return Integer.parseInt(priority);
1614    }
1615    return jobPriority;
1616  }
1617
1618  private int convertPriorityToInteger(String priority) {
1619    JobPriority jobPriority = JobPriority.valueOf(priority);
1620    switch (jobPriority) {
1621    case VERY_HIGH :
1622      return 5;
1623    case HIGH :
1624      return 4;
1625    case NORMAL :
1626      return 3;
1627    case LOW :
1628      return 2;
1629    case VERY_LOW :
1630      return 1;
1631    case DEFAULT :
1632      return 0;
1633    default:
1634      break;
1635    }
1636
1637    // If a user sets the priority as "UNDEFINED_PRIORITY", we can return
1638    // 0 which is also default value.
1639    return 0;
1640  }
1641
1642  private JobPriority convertToJobPriority(int priority) {
1643    switch (priority) {
1644    case 5 :
1645      return JobPriority.VERY_HIGH;
1646    case 4 :
1647      return JobPriority.HIGH;
1648    case 3 :
1649      return JobPriority.NORMAL;
1650    case 2 :
1651      return JobPriority.LOW;
1652    case 1 :
1653      return JobPriority.VERY_LOW;
1654    case 0 :
1655      return JobPriority.DEFAULT;
1656    default:
1657      break;
1658    }
1659
1660    return JobPriority.UNDEFINED_PRIORITY;
1661  }
1662
1663  /**
1664   * Set JobSubmitHostName for this job.
1665   * 
1666   * @param hostname the JobSubmitHostName for this job.
1667   */
1668  void setJobSubmitHostName(String hostname) {
1669    set(MRJobConfig.JOB_SUBMITHOST, hostname);
1670  }
1671  
1672  /**
1673   * Get the  JobSubmitHostName for this job.
1674   * 
1675   * @return the JobSubmitHostName for this job.
1676   */
1677  String getJobSubmitHostName() {
1678    String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1679    
1680    return hostname;
1681  }
1682
1683  /**
1684   * Set JobSubmitHostAddress for this job.
1685   * 
1686   * @param hostadd the JobSubmitHostAddress for this job.
1687   */
1688  void setJobSubmitHostAddress(String hostadd) {
1689    set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1690  }
1691  
1692  /**
1693   * Get JobSubmitHostAddress for this job.
1694   * 
1695   * @return  JobSubmitHostAddress for this job.
1696   */
1697  String getJobSubmitHostAddress() {
1698    String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1699    
1700    return hostadd;
1701  }
1702
1703  /**
1704   * Get whether the task profiling is enabled.
1705   * @return true if some tasks will be profiled
1706   */
1707  public boolean getProfileEnabled() {
1708    return getBoolean(JobContext.TASK_PROFILE, false);
1709  }
1710
1711  /**
1712   * Set whether the system should collect profiler information for some of 
1713   * the tasks in this job? The information is stored in the user log 
1714   * directory.
1715   * @param newValue true means it should be gathered
1716   */
1717  public void setProfileEnabled(boolean newValue) {
1718    setBoolean(JobContext.TASK_PROFILE, newValue);
1719  }
1720
1721  /**
1722   * Get the profiler configuration arguments.
1723   *
1724   * The default value for this property is
1725   * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1726   * 
1727   * @return the parameters to pass to the task child to configure profiling
1728   */
1729  public String getProfileParams() {
1730    return get(JobContext.TASK_PROFILE_PARAMS,
1731        MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
1732  }
1733
1734  /**
1735   * Set the profiler configuration arguments. If the string contains a '%s' it
1736   * will be replaced with the name of the profiling output file when the task
1737   * runs.
1738   *
1739   * This value is passed to the task child JVM on the command line.
1740   *
1741   * @param value the configuration string
1742   */
1743  public void setProfileParams(String value) {
1744    set(JobContext.TASK_PROFILE_PARAMS, value);
1745  }
1746
1747  /**
1748   * Get the range of maps or reduces to profile.
1749   * @param isMap is the task a map?
1750   * @return the task ranges
1751   */
1752  public IntegerRanges getProfileTaskRange(boolean isMap) {
1753    return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 
1754                       JobContext.NUM_REDUCE_PROFILES), "0-2");
1755  }
1756
1757  /**
1758   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1759   * must also be called.
1760   * @param newValue a set of integer ranges of the map ids
1761   */
1762  public void setProfileTaskRange(boolean isMap, String newValue) {
1763    // parse the value to make sure it is legal
1764      new Configuration.IntegerRanges(newValue);
1765    set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 
1766          newValue);
1767  }
1768
1769  /**
1770   * Set the debug script to run when the map tasks fail.
1771   * 
1772   * <p>The debug script can aid debugging of failed map tasks. The script is 
1773   * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1774   * 
1775   * <p>The debug command, run on the node where the map failed, is:</p>
1776   * <p><blockquote><pre>
1777   * $script $stdout $stderr $syslog $jobconf.
1778   * </pre></blockquote>
1779   * 
1780   * <p> The script file is distributed through {@link DistributedCache} 
1781   * APIs. The script needs to be symlinked. </p>
1782   * 
1783   * <p>Here is an example on how to submit a script 
1784   * <p><blockquote><pre>
1785   * job.setMapDebugScript("./myscript");
1786   * DistributedCache.createSymlink(job);
1787   * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1788   * </pre></blockquote>
1789   * 
1790   * @param mDbgScript the script name
1791   */
1792  public void  setMapDebugScript(String mDbgScript) {
1793    set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1794  }
1795  
1796  /**
1797   * Get the map task's debug script.
1798   * 
1799   * @return the debug Script for the mapred job for failed map tasks.
1800   * @see #setMapDebugScript(String)
1801   */
1802  public String getMapDebugScript() {
1803    return get(JobContext.MAP_DEBUG_SCRIPT);
1804  }
1805  
1806  /**
1807   * Set the debug script to run when the reduce tasks fail.
1808   * 
1809   * <p>The debug script can aid debugging of failed reduce tasks. The script
1810   * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1811   * 
1812   * <p>The debug command, run on the node where the map failed, is:</p>
1813   * <p><blockquote><pre>
1814   * $script $stdout $stderr $syslog $jobconf.
1815   * </pre></blockquote>
1816   * 
1817   * <p> The script file is distributed through {@link DistributedCache} 
1818   * APIs. The script file needs to be symlinked </p>
1819   * 
1820   * <p>Here is an example on how to submit a script 
1821   * <p><blockquote><pre>
1822   * job.setReduceDebugScript("./myscript");
1823   * DistributedCache.createSymlink(job);
1824   * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1825   * </pre></blockquote>
1826   * 
1827   * @param rDbgScript the script name
1828   */
1829  public void  setReduceDebugScript(String rDbgScript) {
1830    set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1831  }
1832  
1833  /**
1834   * Get the reduce task's debug Script
1835   * 
1836   * @return the debug script for the mapred job for failed reduce tasks.
1837   * @see #setReduceDebugScript(String)
1838   */
1839  public String getReduceDebugScript() {
1840    return get(JobContext.REDUCE_DEBUG_SCRIPT);
1841  }
1842
1843  /**
1844   * Get the uri to be invoked in-order to send a notification after the job 
1845   * has completed (success/failure). 
1846   * 
1847   * @return the job end notification uri, <code>null</code> if it hasn't
1848   *         been set.
1849   * @see #setJobEndNotificationURI(String)
1850   */
1851  public String getJobEndNotificationURI() {
1852    return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1853  }
1854
1855  /**
1856   * Set the uri to be invoked in-order to send a notification after the job
1857   * has completed (success/failure).
1858   * 
1859   * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 
1860   * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 
1861   * identifier and completion-status respectively.</p>
1862   * 
1863   * <p>This is typically used by application-writers to implement chaining of 
1864   * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1865   * 
1866   * @param uri the job end notification uri
1867   * @see JobStatus
1868   */
1869  public void setJobEndNotificationURI(String uri) {
1870    set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1871  }
1872
1873  /**
1874   * Get job-specific shared directory for use as scratch space
1875   * 
1876   * <p>
1877   * When a job starts, a shared directory is created at location
1878   * <code>
1879   * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1880   * This directory is exposed to the users through 
1881   * <code>mapreduce.job.local.dir </code>.
1882   * So, the tasks can use this space 
1883   * as scratch space and share files among them. </p>
1884   * This value is available as System property also.
1885   * 
1886   * @return The localized job specific shared directory
1887   */
1888  public String getJobLocalDir() {
1889    return get(JobContext.JOB_LOCAL_DIR);
1890  }
1891
1892  /**
1893   * Get memory required to run a map task of the job, in MB.
1894   * 
1895   * If a value is specified in the configuration, it is returned.
1896   * Else, it returns {@link JobContext#DEFAULT_MAP_MEMORY_MB}.
1897   * <p>
1898   * For backward compatibility, if the job configuration sets the
1899   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1900   * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1901   * after converting it from bytes to MB.
1902   * @return memory required to run a map task of the job, in MB,
1903   */
1904  public long getMemoryForMapTask() {
1905    long value = getDeprecatedMemoryValue();
1906    if (value < 0) {
1907      return getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
1908          JobContext.DEFAULT_MAP_MEMORY_MB);
1909    }
1910    return value;
1911  }
1912
1913  public void setMemoryForMapTask(long mem) {
1914    setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1915    // In case that M/R 1.x applications use the old property name
1916    setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1917  }
1918
1919  /**
1920   * Get memory required to run a reduce task of the job, in MB.
1921   * 
1922   * If a value is specified in the configuration, it is returned.
1923   * Else, it returns {@link JobContext#DEFAULT_REDUCE_MEMORY_MB}.
1924   * <p>
1925   * For backward compatibility, if the job configuration sets the
1926   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1927   * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1928   * after converting it from bytes to MB.
1929   * @return memory required to run a reduce task of the job, in MB.
1930   */
1931  public long getMemoryForReduceTask() {
1932    long value = getDeprecatedMemoryValue();
1933    if (value < 0) {
1934      return getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
1935          JobContext.DEFAULT_REDUCE_MEMORY_MB);
1936    }
1937    return value;
1938  }
1939  
1940  // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1941  // converted into MBs.
1942  // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1943  // value.
1944  private long getDeprecatedMemoryValue() {
1945    long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 
1946        DISABLED_MEMORY_LIMIT);
1947    if (oldValue > 0) {
1948      oldValue /= (1024*1024);
1949    }
1950    return oldValue;
1951  }
1952
1953  public void setMemoryForReduceTask(long mem) {
1954    setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1955    // In case that M/R 1.x applications use the old property name
1956    setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1957  }
1958
1959  /**
1960   * Return the name of the queue to which this job is submitted.
1961   * Defaults to 'default'.
1962   * 
1963   * @return name of the queue
1964   */
1965  public String getQueueName() {
1966    return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1967  }
1968  
1969  /**
1970   * Set the name of the queue to which this job should be submitted.
1971   * 
1972   * @param queueName Name of the queue
1973   */
1974  public void setQueueName(String queueName) {
1975    set(JobContext.QUEUE_NAME, queueName);
1976  }
1977  
1978  /**
1979   * Normalize the negative values in configuration
1980   * 
1981   * @param val
1982   * @return normalized value
1983   */
1984  public static long normalizeMemoryConfigValue(long val) {
1985    if (val < 0) {
1986      val = DISABLED_MEMORY_LIMIT;
1987    }
1988    return val;
1989  }
1990
1991  /** 
1992   * Find a jar that contains a class of the same name, if any.
1993   * It will return a jar file, even if that is not the first thing
1994   * on the class path that has a class with the same name.
1995   * 
1996   * @param my_class the class to find.
1997   * @return a jar file that contains the class, or null.
1998   */
1999  public static String findContainingJar(Class my_class) {
2000    return ClassUtil.findContainingJar(my_class);
2001  }
2002
2003  /**
2004   * Get the memory required to run a task of this job, in bytes. See
2005   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
2006   * <p>
2007   * This method is deprecated. Now, different memory limits can be
2008   * set for map and reduce tasks of a job, in MB. 
2009   * <p>
2010   * For backward compatibility, if the job configuration sets the
2011   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY}, that value is returned. 
2012   * Otherwise, this method will return the larger of the values returned by 
2013   * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
2014   * after converting them into bytes.
2015   *
2016   * @return Memory required to run a task of this job, in bytes.
2017   * @see #setMaxVirtualMemoryForTask(long)
2018   * @deprecated Use {@link #getMemoryForMapTask()} and
2019   *             {@link #getMemoryForReduceTask()}
2020   */
2021  @Deprecated
2022  public long getMaxVirtualMemoryForTask() {
2023    LOG.warn(
2024      "getMaxVirtualMemoryForTask() is deprecated. " +
2025      "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
2026
2027    long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
2028        Math.max(getMemoryForMapTask(), getMemoryForReduceTask()) * 1024 * 1024);
2029    return value;
2030  }
2031
2032  /**
2033   * Set the maximum amount of memory any task of this job can use. See
2034   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
2035   * <p>
2036   * mapred.task.maxvmem is split into
2037   * mapreduce.map.memory.mb
2038   * and mapreduce.map.memory.mb,mapred
2039   * each of the new key are set
2040   * as mapred.task.maxvmem / 1024
2041   * as new values are in MB
2042   *
2043   * @param vmem Maximum amount of virtual memory in bytes any task of this job
2044   *             can use.
2045   * @see #getMaxVirtualMemoryForTask()
2046   * @deprecated
2047   *  Use {@link #setMemoryForMapTask(long mem)}  and
2048   *  Use {@link #setMemoryForReduceTask(long mem)}
2049   */
2050  @Deprecated
2051  public void setMaxVirtualMemoryForTask(long vmem) {
2052    LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
2053      "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
2054    if (vmem < 0) {
2055      throw new IllegalArgumentException("Task memory allocation may not be < 0");
2056    }
2057
2058    if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
2059      setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
2060      setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
2061    }else{
2062      this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
2063    }
2064  }
2065
2066  /**
2067   * @deprecated this variable is deprecated and nolonger in use.
2068   */
2069  @Deprecated
2070  public long getMaxPhysicalMemoryForTask() {
2071    LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
2072              + " Refer to the APIs getMemoryForMapTask() and"
2073              + " getMemoryForReduceTask() for details.");
2074    return -1;
2075  }
2076
2077  /*
2078   * @deprecated this
2079   */
2080  @Deprecated
2081  public void setMaxPhysicalMemoryForTask(long mem) {
2082    LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
2083        + " The value set is ignored. Refer to "
2084        + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
2085  }
2086
2087  static String deprecatedString(String key) {
2088    return "The variable " + key + " is no longer used.";
2089  }
2090
2091  private void checkAndWarnDeprecation() {
2092    if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
2093      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
2094                + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY
2095                + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY);
2096    }
2097    if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
2098      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
2099    }
2100    if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
2101      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
2102    }
2103    if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
2104      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
2105    }
2106  }
2107  
2108
2109  /* For debugging. Dump configurations to system output as XML format. */
2110  public static void main(String[] args) throws Exception {
2111    new JobConf(new Configuration()).writeXml(System.out);
2112  }
2113
2114}
2115