001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    
022    import java.io.IOException;
023    import java.util.regex.Pattern;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceAudience.Private;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.fs.FileStatus;
032    import org.apache.hadoop.fs.FileSystem;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.io.LongWritable;
035    import org.apache.hadoop.io.RawComparator;
036    import org.apache.hadoop.io.Text;
037    import org.apache.hadoop.io.WritableComparable;
038    import org.apache.hadoop.io.WritableComparator;
039    import org.apache.hadoop.io.compress.CompressionCodec;
040    import org.apache.hadoop.mapred.lib.HashPartitioner;
041    import org.apache.hadoop.mapred.lib.IdentityMapper;
042    import org.apache.hadoop.mapred.lib.IdentityReducer;
043    import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
044    import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
045    import org.apache.hadoop.mapreduce.MRConfig;
046    import org.apache.hadoop.mapreduce.MRJobConfig;
047    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
048    import org.apache.hadoop.mapreduce.util.ConfigUtil;
049    import org.apache.hadoop.security.Credentials;
050    import org.apache.hadoop.util.ClassUtil;
051    import org.apache.hadoop.util.ReflectionUtils;
052    import org.apache.hadoop.util.Tool;
053    import org.apache.log4j.Level;
054    
055    /** 
056     * A map/reduce job configuration.
057     * 
058     * <p><code>JobConf</code> is the primary interface for a user to describe a 
059     * map-reduce job to the Hadoop framework for execution. The framework tries to
060     * faithfully execute the job as-is described by <code>JobConf</code>, however:
061     * <ol>
062     *   <li>
063     *   Some configuration parameters might have been marked as 
064     *   <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
065     *   final</a> by administrators and hence cannot be altered.
066     *   </li>
067     *   <li>
068     *   While some job parameters are straight-forward to set 
069     *   (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly
070     *   with the rest of the framework and/or job-configuration and is relatively
071     *   more complex for the user to control finely
072     *   (e.g. {@link #setNumMapTasks(int)}).
073     *   </li>
074     * </ol></p>
075     * 
076     * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 
077     * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 
078     * {@link OutputFormat} implementations to be used etc.
079     *
080     * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 
081     * of the job such as <code>Comparator</code>s to be used, files to be put in  
082     * the {@link DistributedCache}, whether or not intermediate and/or job outputs 
083     * are to be compressed (and how), debugability via user-provided scripts 
084     * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
085     * for doing post-processing on task logs, task's stdout, stderr, syslog. 
086     * and etc.</p>
087     * 
088     * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
089     * <p><blockquote><pre>
090     *     // Create a new JobConf
091     *     JobConf job = new JobConf(new Configuration(), MyJob.class);
092     *     
093     *     // Specify various job-specific parameters     
094     *     job.setJobName("myjob");
095     *     
096     *     FileInputFormat.setInputPaths(job, new Path("in"));
097     *     FileOutputFormat.setOutputPath(job, new Path("out"));
098     *     
099     *     job.setMapperClass(MyJob.MyMapper.class);
100     *     job.setCombinerClass(MyJob.MyReducer.class);
101     *     job.setReducerClass(MyJob.MyReducer.class);
102     *     
103     *     job.setInputFormat(SequenceFileInputFormat.class);
104     *     job.setOutputFormat(SequenceFileOutputFormat.class);
105     * </pre></blockquote></p>
106     * 
107     * @see JobClient
108     * @see ClusterStatus
109     * @see Tool
110     * @see DistributedCache
111     */
112    @InterfaceAudience.Public
113    @InterfaceStability.Stable
114    public class JobConf extends Configuration {
115    
116      private static final Log LOG = LogFactory.getLog(JobConf.class);
117    
118      static{
119        ConfigUtil.loadResources();
120      }
121    
122      /**
123       * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and
124       * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
125       */
126      @Deprecated
127      public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
128        "mapred.task.maxvmem";
129    
130      /**
131       * @deprecated 
132       */
133      @Deprecated
134      public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
135        "mapred.task.limit.maxvmem";
136    
137      /**
138       * @deprecated
139       */
140      @Deprecated
141      public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
142        "mapred.task.default.maxvmem";
143    
144      /**
145       * @deprecated
146       */
147      @Deprecated
148      public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
149        "mapred.task.maxpmem";
150    
151      /**
152       * A value which if set for memory related configuration options,
153       * indicates that the options are turned off.
154       * Deprecated because it makes no sense in the context of MR2.
155       */
156      @Deprecated
157      public static final long DISABLED_MEMORY_LIMIT = -1L;
158    
159      /**
160       * Property name for the configuration property mapreduce.cluster.local.dir
161       */
162      public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
163    
164      /**
165       * Name of the queue to which jobs will be submitted, if no queue
166       * name is mentioned.
167       */
168      public static final String DEFAULT_QUEUE_NAME = "default";
169    
170      static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY =
171          JobContext.MAP_MEMORY_MB;
172    
173      static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY =
174        JobContext.REDUCE_MEMORY_MB;
175    
176      /**
177       * The variable is kept for M/R 1.x applications, while M/R 2.x applications
178       * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY}
179       */
180      @Deprecated
181      public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
182          "mapred.job.map.memory.mb";
183    
184      /**
185       * The variable is kept for M/R 1.x applications, while M/R 2.x applications
186       * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
187       */
188      @Deprecated
189      public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
190          "mapred.job.reduce.memory.mb";
191    
192      /** Pattern for the default unpacking behavior for job jars */
193      public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
194        Pattern.compile("(?:classes/|lib/).*");
195    
196      /**
197       * Configuration key to set the java command line options for the child
198       * map and reduce tasks.
199       * 
200       * Java opts for the task tracker child processes.
201       * The following symbol, if present, will be interpolated: @taskid@. 
202       * It is replaced by current TaskID. Any other occurrences of '@' will go 
203       * unchanged.
204       * For example, to enable verbose gc logging to a file named for the taskid in
205       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
206       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
207       * 
208       * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 
209       * other environment variables to the child processes.
210       * 
211       * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 
212       *                 {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
213       */
214      @Deprecated
215      public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
216      
217      /**
218       * Configuration key to set the java command line options for the map tasks.
219       * 
220       * Java opts for the task tracker child map processes.
221       * The following symbol, if present, will be interpolated: @taskid@. 
222       * It is replaced by current TaskID. Any other occurrences of '@' will go 
223       * unchanged.
224       * For example, to enable verbose gc logging to a file named for the taskid in
225       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
226       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
227       * 
228       * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 
229       * other environment variables to the map processes.
230       */
231      public static final String MAPRED_MAP_TASK_JAVA_OPTS = 
232        JobContext.MAP_JAVA_OPTS;
233      
234      /**
235       * Configuration key to set the java command line options for the reduce tasks.
236       * 
237       * Java opts for the task tracker child reduce processes.
238       * The following symbol, if present, will be interpolated: @taskid@. 
239       * It is replaced by current TaskID. Any other occurrences of '@' will go 
240       * unchanged.
241       * For example, to enable verbose gc logging to a file named for the taskid in
242       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
243       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
244       * 
245       * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 
246       * pass process environment variables to the reduce processes.
247       */
248      public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
249        JobContext.REDUCE_JAVA_OPTS;
250      
251      public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
252      
253      /**
254       * @deprecated
255       * Configuration key to set the maximum virtual memory available to the child
256       * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
257       * longer have any effect.
258       */
259      @Deprecated
260      public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
261    
262      /**
263       * @deprecated
264       * Configuration key to set the maximum virtual memory available to the
265       * map tasks (in kilo-bytes). This has been deprecated and will no
266       * longer have any effect.
267       */
268      @Deprecated
269      public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
270      
271      /**
272       * @deprecated
273       * Configuration key to set the maximum virtual memory available to the
274       * reduce tasks (in kilo-bytes). This has been deprecated and will no
275       * longer have any effect.
276       */
277      @Deprecated
278      public static final String MAPRED_REDUCE_TASK_ULIMIT =
279        "mapreduce.reduce.ulimit";
280    
281    
282      /**
283       * Configuration key to set the environment of the child map/reduce tasks.
284       * 
285       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
286       * reference existing environment variables via <code>$key</code> on
287       * Linux or <code>%key%</code> on Windows.
288       * 
289       * Example:
290       * <ul>
291       *   <li> A=foo - This will set the env variable A to foo. </li>
292       *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
293       *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
294       * </ul>
295       * 
296       * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 
297       *                 {@link #MAPRED_REDUCE_TASK_ENV}
298       */
299      @Deprecated
300      public static final String MAPRED_TASK_ENV = "mapred.child.env";
301    
302      /**
303       * Configuration key to set the environment of the child map tasks.
304       * 
305       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
306       * reference existing environment variables via <code>$key</code> on
307       * Linux or <code>%key%</code> on Windows.
308       * 
309       * Example:
310       * <ul>
311       *   <li> A=foo - This will set the env variable A to foo. </li>
312       *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
313       *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
314       * </ul>
315       */
316      public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
317      
318      /**
319       * Configuration key to set the environment of the child reduce tasks.
320       * 
321       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
322       * reference existing environment variables via <code>$key</code> on
323       * Linux or <code>%key%</code> on Windows.
324       * 
325       * Example:
326       * <ul>
327       *   <li> A=foo - This will set the env variable A to foo. </li>
328       *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
329       *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
330       * </ul>
331       */
332      public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
333    
334      private Credentials credentials = new Credentials();
335      
336      /**
337       * Configuration key to set the logging {@link Level} for the map task.
338       *
339       * The allowed logging levels are:
340       * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
341       */
342      public static final String MAPRED_MAP_TASK_LOG_LEVEL = 
343        JobContext.MAP_LOG_LEVEL;
344      
345      /**
346       * Configuration key to set the logging {@link Level} for the reduce task.
347       *
348       * The allowed logging levels are:
349       * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
350       */
351      public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 
352        JobContext.REDUCE_LOG_LEVEL;
353      
354      /**
355       * Default logging level for map/reduce tasks.
356       */
357      public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
358    
359      /**
360       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
361       * use {@link MRJobConfig#WORKFLOW_ID} instead
362       */
363      @Deprecated
364      public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID;
365    
366      /**
367       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
368       * use {@link MRJobConfig#WORKFLOW_NAME} instead
369       */
370      @Deprecated
371      public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME;
372    
373      /**
374       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
375       * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead
376       */
377      @Deprecated
378      public static final String WORKFLOW_NODE_NAME =
379          MRJobConfig.WORKFLOW_NODE_NAME;
380    
381      /**
382       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
383       * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead
384       */
385      @Deprecated
386      public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
387          MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING;
388    
389      /**
390       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
391       * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead
392       */
393      @Deprecated
394      public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
395          MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN;
396    
397      /**
398       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
399       * use {@link MRJobConfig#WORKFLOW_TAGS} instead
400       */
401      @Deprecated
402      public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS;
403    
404      /**
405       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
406       * not use it
407       */
408      @Deprecated
409      public static final String MAPREDUCE_RECOVER_JOB =
410          "mapreduce.job.restart.recover";
411    
412      /**
413       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
414       * not use it
415       */
416      @Deprecated
417      public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true;
418    
419      /**
420       * Construct a map/reduce job configuration.
421       */
422      public JobConf() {
423        checkAndWarnDeprecation();
424      }
425    
426      /** 
427       * Construct a map/reduce job configuration.
428       * 
429       * @param exampleClass a class whose containing jar is used as the job's jar.
430       */
431      public JobConf(Class exampleClass) {
432        setJarByClass(exampleClass);
433        checkAndWarnDeprecation();
434      }
435      
436      /**
437       * Construct a map/reduce job configuration.
438       * 
439       * @param conf a Configuration whose settings will be inherited.
440       */
441      public JobConf(Configuration conf) {
442        super(conf);
443        
444        if (conf instanceof JobConf) {
445          JobConf that = (JobConf)conf;
446          credentials = that.credentials;
447        }
448        
449        checkAndWarnDeprecation();
450      }
451    
452    
453      /** Construct a map/reduce job configuration.
454       * 
455       * @param conf a Configuration whose settings will be inherited.
456       * @param exampleClass a class whose containing jar is used as the job's jar.
457       */
458      public JobConf(Configuration conf, Class exampleClass) {
459        this(conf);
460        setJarByClass(exampleClass);
461      }
462    
463    
464      /** Construct a map/reduce configuration.
465       *
466       * @param config a Configuration-format XML job description file.
467       */
468      public JobConf(String config) {
469        this(new Path(config));
470      }
471    
472      /** Construct a map/reduce configuration.
473       *
474       * @param config a Configuration-format XML job description file.
475       */
476      public JobConf(Path config) {
477        super();
478        addResource(config);
479        checkAndWarnDeprecation();
480      }
481    
482      /** A new map/reduce configuration where the behavior of reading from the
483       * default resources can be turned off.
484       * <p/>
485       * If the parameter {@code loadDefaults} is false, the new instance
486       * will not load resources from the default files.
487       *
488       * @param loadDefaults specifies whether to load from the default files
489       */
490      public JobConf(boolean loadDefaults) {
491        super(loadDefaults);
492        checkAndWarnDeprecation();
493      }
494    
495      /**
496       * Get credentials for the job.
497       * @return credentials for the job
498       */
499      public Credentials getCredentials() {
500        return credentials;
501      }
502      
503      @Private
504      public void setCredentials(Credentials credentials) {
505        this.credentials = credentials;
506      }
507      
508      /**
509       * Get the user jar for the map-reduce job.
510       * 
511       * @return the user jar for the map-reduce job.
512       */
513      public String getJar() { return get(JobContext.JAR); }
514      
515      /**
516       * Set the user jar for the map-reduce job.
517       * 
518       * @param jar the user jar for the map-reduce job.
519       */
520      public void setJar(String jar) { set(JobContext.JAR, jar); }
521    
522      /**
523       * Get the pattern for jar contents to unpack on the tasktracker
524       */
525      public Pattern getJarUnpackPattern() {
526        return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
527      }
528    
529      
530      /**
531       * Set the job's jar file by finding an example class location.
532       * 
533       * @param cls the example class.
534       */
535      public void setJarByClass(Class cls) {
536        String jar = ClassUtil.findContainingJar(cls);
537        if (jar != null) {
538          setJar(jar);
539        }   
540      }
541    
542      public String[] getLocalDirs() throws IOException {
543        return getTrimmedStrings(MRConfig.LOCAL_DIR);
544      }
545    
546      /**
547       * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
548       */
549      @Deprecated
550      public void deleteLocalFiles() throws IOException {
551        String[] localDirs = getLocalDirs();
552        for (int i = 0; i < localDirs.length; i++) {
553          FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
554        }
555      }
556    
557      public void deleteLocalFiles(String subdir) throws IOException {
558        String[] localDirs = getLocalDirs();
559        for (int i = 0; i < localDirs.length; i++) {
560          FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
561        }
562      }
563    
564      /** 
565       * Constructs a local file name. Files are distributed among configured
566       * local directories.
567       */
568      public Path getLocalPath(String pathString) throws IOException {
569        return getLocalPath(MRConfig.LOCAL_DIR, pathString);
570      }
571    
572      /**
573       * Get the reported username for this job.
574       * 
575       * @return the username
576       */
577      public String getUser() {
578        return get(JobContext.USER_NAME);
579      }
580      
581      /**
582       * Set the reported username for this job.
583       * 
584       * @param user the username for this job.
585       */
586      public void setUser(String user) {
587        set(JobContext.USER_NAME, user);
588      }
589    
590    
591      
592      /**
593       * Set whether the framework should keep the intermediate files for 
594       * failed tasks.
595       * 
596       * @param keep <code>true</code> if framework should keep the intermediate files 
597       *             for failed tasks, <code>false</code> otherwise.
598       * 
599       */
600      public void setKeepFailedTaskFiles(boolean keep) {
601        setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
602      }
603      
604      /**
605       * Should the temporary files for failed tasks be kept?
606       * 
607       * @return should the files be kept?
608       */
609      public boolean getKeepFailedTaskFiles() {
610        return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
611      }
612      
613      /**
614       * Set a regular expression for task names that should be kept. 
615       * The regular expression ".*_m_000123_0" would keep the files
616       * for the first instance of map 123 that ran.
617       * 
618       * @param pattern the java.util.regex.Pattern to match against the 
619       *        task names.
620       */
621      public void setKeepTaskFilesPattern(String pattern) {
622        set(JobContext.PRESERVE_FILES_PATTERN, pattern);
623      }
624      
625      /**
626       * Get the regular expression that is matched against the task names
627       * to see if we need to keep the files.
628       * 
629       * @return the pattern as a string, if it was set, othewise null.
630       */
631      public String getKeepTaskFilesPattern() {
632        return get(JobContext.PRESERVE_FILES_PATTERN);
633      }
634      
635      /**
636       * Set the current working directory for the default file system.
637       * 
638       * @param dir the new current working directory.
639       */
640      public void setWorkingDirectory(Path dir) {
641        dir = new Path(getWorkingDirectory(), dir);
642        set(JobContext.WORKING_DIR, dir.toString());
643      }
644      
645      /**
646       * Get the current working directory for the default file system.
647       * 
648       * @return the directory name.
649       */
650      public Path getWorkingDirectory() {
651        String name = get(JobContext.WORKING_DIR);
652        if (name != null) {
653          return new Path(name);
654        } else {
655          try {
656            Path dir = FileSystem.get(this).getWorkingDirectory();
657            set(JobContext.WORKING_DIR, dir.toString());
658            return dir;
659          } catch (IOException e) {
660            throw new RuntimeException(e);
661          }
662        }
663      }
664      
665      /**
666       * Sets the number of tasks that a spawned task JVM should run
667       * before it exits
668       * @param numTasks the number of tasks to execute; defaults to 1;
669       * -1 signifies no limit
670       */
671      public void setNumTasksToExecutePerJvm(int numTasks) {
672        setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
673      }
674      
675      /**
676       * Get the number of tasks that a spawned JVM should execute
677       */
678      public int getNumTasksToExecutePerJvm() {
679        return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
680      }
681      
682      /**
683       * Get the {@link InputFormat} implementation for the map-reduce job,
684       * defaults to {@link TextInputFormat} if not specified explicity.
685       * 
686       * @return the {@link InputFormat} implementation for the map-reduce job.
687       */
688      public InputFormat getInputFormat() {
689        return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
690                                                                 TextInputFormat.class,
691                                                                 InputFormat.class),
692                                                        this);
693      }
694      
695      /**
696       * Set the {@link InputFormat} implementation for the map-reduce job.
697       * 
698       * @param theClass the {@link InputFormat} implementation for the map-reduce 
699       *                 job.
700       */
701      public void setInputFormat(Class<? extends InputFormat> theClass) {
702        setClass("mapred.input.format.class", theClass, InputFormat.class);
703      }
704      
705      /**
706       * Get the {@link OutputFormat} implementation for the map-reduce job,
707       * defaults to {@link TextOutputFormat} if not specified explicity.
708       * 
709       * @return the {@link OutputFormat} implementation for the map-reduce job.
710       */
711      public OutputFormat getOutputFormat() {
712        return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
713                                                                  TextOutputFormat.class,
714                                                                  OutputFormat.class),
715                                                         this);
716      }
717    
718      /**
719       * Get the {@link OutputCommitter} implementation for the map-reduce job,
720       * defaults to {@link FileOutputCommitter} if not specified explicitly.
721       * 
722       * @return the {@link OutputCommitter} implementation for the map-reduce job.
723       */
724      public OutputCommitter getOutputCommitter() {
725        return (OutputCommitter)ReflectionUtils.newInstance(
726          getClass("mapred.output.committer.class", FileOutputCommitter.class,
727                   OutputCommitter.class), this);
728      }
729    
730      /**
731       * Set the {@link OutputCommitter} implementation for the map-reduce job.
732       * 
733       * @param theClass the {@link OutputCommitter} implementation for the map-reduce 
734       *                 job.
735       */
736      public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
737        setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
738      }
739      
740      /**
741       * Set the {@link OutputFormat} implementation for the map-reduce job.
742       * 
743       * @param theClass the {@link OutputFormat} implementation for the map-reduce 
744       *                 job.
745       */
746      public void setOutputFormat(Class<? extends OutputFormat> theClass) {
747        setClass("mapred.output.format.class", theClass, OutputFormat.class);
748      }
749    
750      /**
751       * Should the map outputs be compressed before transfer?
752       * 
753       * @param compress should the map outputs be compressed?
754       */
755      public void setCompressMapOutput(boolean compress) {
756        setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
757      }
758      
759      /**
760       * Are the outputs of the maps be compressed?
761       * 
762       * @return <code>true</code> if the outputs of the maps are to be compressed,
763       *         <code>false</code> otherwise.
764       */
765      public boolean getCompressMapOutput() {
766        return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
767      }
768    
769      /**
770       * Set the given class as the  {@link CompressionCodec} for the map outputs.
771       * 
772       * @param codecClass the {@link CompressionCodec} class that will compress  
773       *                   the map outputs.
774       */
775      public void 
776      setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
777        setCompressMapOutput(true);
778        setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 
779                 CompressionCodec.class);
780      }
781      
782      /**
783       * Get the {@link CompressionCodec} for compressing the map outputs.
784       * 
785       * @param defaultValue the {@link CompressionCodec} to return if not set
786       * @return the {@link CompressionCodec} class that should be used to compress the 
787       *         map outputs.
788       * @throws IllegalArgumentException if the class was specified, but not found
789       */
790      public Class<? extends CompressionCodec> 
791      getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
792        Class<? extends CompressionCodec> codecClass = defaultValue;
793        String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
794        if (name != null) {
795          try {
796            codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
797          } catch (ClassNotFoundException e) {
798            throw new IllegalArgumentException("Compression codec " + name + 
799                                               " was not found.", e);
800          }
801        }
802        return codecClass;
803      }
804      
805      /**
806       * Get the key class for the map output data. If it is not set, use the
807       * (final) output key class. This allows the map output key class to be
808       * different than the final output key class.
809       *  
810       * @return the map output key class.
811       */
812      public Class<?> getMapOutputKeyClass() {
813        Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
814        if (retv == null) {
815          retv = getOutputKeyClass();
816        }
817        return retv;
818      }
819      
820      /**
821       * Set the key class for the map output data. This allows the user to
822       * specify the map output key class to be different than the final output
823       * value class.
824       * 
825       * @param theClass the map output key class.
826       */
827      public void setMapOutputKeyClass(Class<?> theClass) {
828        setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
829      }
830      
831      /**
832       * Get the value class for the map output data. If it is not set, use the
833       * (final) output value class This allows the map output value class to be
834       * different than the final output value class.
835       *  
836       * @return the map output value class.
837       */
838      public Class<?> getMapOutputValueClass() {
839        Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
840            Object.class);
841        if (retv == null) {
842          retv = getOutputValueClass();
843        }
844        return retv;
845      }
846      
847      /**
848       * Set the value class for the map output data. This allows the user to
849       * specify the map output value class to be different than the final output
850       * value class.
851       * 
852       * @param theClass the map output value class.
853       */
854      public void setMapOutputValueClass(Class<?> theClass) {
855        setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
856      }
857      
858      /**
859       * Get the key class for the job output data.
860       * 
861       * @return the key class for the job output data.
862       */
863      public Class<?> getOutputKeyClass() {
864        return getClass(JobContext.OUTPUT_KEY_CLASS,
865                        LongWritable.class, Object.class);
866      }
867      
868      /**
869       * Set the key class for the job output data.
870       * 
871       * @param theClass the key class for the job output data.
872       */
873      public void setOutputKeyClass(Class<?> theClass) {
874        setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
875      }
876    
877      /**
878       * Get the {@link RawComparator} comparator used to compare keys.
879       * 
880       * @return the {@link RawComparator} comparator used to compare keys.
881       */
882      public RawComparator getOutputKeyComparator() {
883        Class<? extends RawComparator> theClass = getClass(
884          JobContext.KEY_COMPARATOR, null, RawComparator.class);
885        if (theClass != null)
886          return ReflectionUtils.newInstance(theClass, this);
887        return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
888      }
889    
890      /**
891       * Set the {@link RawComparator} comparator used to compare keys.
892       * 
893       * @param theClass the {@link RawComparator} comparator used to 
894       *                 compare keys.
895       * @see #setOutputValueGroupingComparator(Class)                 
896       */
897      public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
898        setClass(JobContext.KEY_COMPARATOR,
899                 theClass, RawComparator.class);
900      }
901    
902      /**
903       * Set the {@link KeyFieldBasedComparator} options used to compare keys.
904       * 
905       * @param keySpec the key specification of the form -k pos1[,pos2], where,
906       *  pos is of the form f[.c][opts], where f is the number
907       *  of the key field to use, and c is the number of the first character from
908       *  the beginning of the field. Fields and character posns are numbered 
909       *  starting with 1; a character position of zero in pos2 indicates the
910       *  field's last character. If '.c' is omitted from pos1, it defaults to 1
911       *  (the beginning of the field); if omitted from pos2, it defaults to 0 
912       *  (the end of the field). opts are ordering options. The supported options
913       *  are:
914       *    -n, (Sort numerically)
915       *    -r, (Reverse the result of comparison)                 
916       */
917      public void setKeyFieldComparatorOptions(String keySpec) {
918        setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
919        set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
920      }
921      
922      /**
923       * Get the {@link KeyFieldBasedComparator} options
924       */
925      public String getKeyFieldComparatorOption() {
926        return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
927      }
928    
929      /**
930       * Set the {@link KeyFieldBasedPartitioner} options used for 
931       * {@link Partitioner}
932       * 
933       * @param keySpec the key specification of the form -k pos1[,pos2], where,
934       *  pos is of the form f[.c][opts], where f is the number
935       *  of the key field to use, and c is the number of the first character from
936       *  the beginning of the field. Fields and character posns are numbered 
937       *  starting with 1; a character position of zero in pos2 indicates the
938       *  field's last character. If '.c' is omitted from pos1, it defaults to 1
939       *  (the beginning of the field); if omitted from pos2, it defaults to 0 
940       *  (the end of the field).
941       */
942      public void setKeyFieldPartitionerOptions(String keySpec) {
943        setPartitionerClass(KeyFieldBasedPartitioner.class);
944        set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
945      }
946      
947      /**
948       * Get the {@link KeyFieldBasedPartitioner} options
949       */
950      public String getKeyFieldPartitionerOption() {
951        return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
952      }
953    
954      /**
955       * Get the user defined {@link WritableComparable} comparator for
956       * grouping keys of inputs to the combiner.
957       *
958       * @return comparator set by the user for grouping values.
959       * @see #setCombinerKeyGroupingComparator(Class) for details.
960       */
961      public RawComparator getCombinerKeyGroupingComparator() {
962        Class<? extends RawComparator> theClass = getClass(
963            JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
964        if (theClass == null) {
965          return getOutputKeyComparator();
966        }
967    
968        return ReflectionUtils.newInstance(theClass, this);
969      }
970    
971      /** 
972       * Get the user defined {@link WritableComparable} comparator for 
973       * grouping keys of inputs to the reduce.
974       * 
975       * @return comparator set by the user for grouping values.
976       * @see #setOutputValueGroupingComparator(Class) for details.
977       */
978      public RawComparator getOutputValueGroupingComparator() {
979        Class<? extends RawComparator> theClass = getClass(
980          JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
981        if (theClass == null) {
982          return getOutputKeyComparator();
983        }
984        
985        return ReflectionUtils.newInstance(theClass, this);
986      }
987    
988      /**
989       * Set the user defined {@link RawComparator} comparator for
990       * grouping keys in the input to the combiner.
991       * <p/>
992       * <p>This comparator should be provided if the equivalence rules for keys
993       * for sorting the intermediates are different from those for grouping keys
994       * before each call to
995       * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
996       * <p/>
997       * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
998       * in a single call to the reduce function if K1 and K2 compare as equal.</p>
999       * <p/>
1000       * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
1001       * how keys are sorted, this can be used in conjunction to simulate
1002       * <i>secondary sort on values</i>.</p>
1003       * <p/>
1004       * <p><i>Note</i>: This is not a guarantee of the combiner sort being
1005       * <i>stable</i> in any sense. (In any case, with the order of available
1006       * map-outputs to the combiner being non-deterministic, it wouldn't make
1007       * that much sense.)</p>
1008       *
1009       * @param theClass the comparator class to be used for grouping keys for the
1010       * combiner. It should implement <code>RawComparator</code>.
1011       * @see #setOutputKeyComparatorClass(Class)
1012       */
1013      public void setCombinerKeyGroupingComparator(
1014          Class<? extends RawComparator> theClass) {
1015        setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
1016            theClass, RawComparator.class);
1017      }
1018    
1019      /** 
1020       * Set the user defined {@link RawComparator} comparator for 
1021       * grouping keys in the input to the reduce.
1022       * 
1023       * <p>This comparator should be provided if the equivalence rules for keys
1024       * for sorting the intermediates are different from those for grouping keys
1025       * before each call to 
1026       * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
1027       *  
1028       * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
1029       * in a single call to the reduce function if K1 and K2 compare as equal.</p>
1030       * 
1031       * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 
1032       * how keys are sorted, this can be used in conjunction to simulate 
1033       * <i>secondary sort on values</i>.</p>
1034       *  
1035       * <p><i>Note</i>: This is not a guarantee of the reduce sort being 
1036       * <i>stable</i> in any sense. (In any case, with the order of available 
1037       * map-outputs to the reduce being non-deterministic, it wouldn't make 
1038       * that much sense.)</p>
1039       * 
1040       * @param theClass the comparator class to be used for grouping keys. 
1041       *                 It should implement <code>RawComparator</code>.
1042       * @see #setOutputKeyComparatorClass(Class)
1043       * @see #setCombinerKeyGroupingComparator(Class)
1044       */
1045      public void setOutputValueGroupingComparator(
1046          Class<? extends RawComparator> theClass) {
1047        setClass(JobContext.GROUP_COMPARATOR_CLASS,
1048                 theClass, RawComparator.class);
1049      }
1050    
1051      /**
1052       * Should the framework use the new context-object code for running
1053       * the mapper?
1054       * @return true, if the new api should be used
1055       */
1056      public boolean getUseNewMapper() {
1057        return getBoolean("mapred.mapper.new-api", false);
1058      }
1059      /**
1060       * Set whether the framework should use the new api for the mapper.
1061       * This is the default for jobs submitted with the new Job api.
1062       * @param flag true, if the new api should be used
1063       */
1064      public void setUseNewMapper(boolean flag) {
1065        setBoolean("mapred.mapper.new-api", flag);
1066      }
1067    
1068      /**
1069       * Should the framework use the new context-object code for running
1070       * the reducer?
1071       * @return true, if the new api should be used
1072       */
1073      public boolean getUseNewReducer() {
1074        return getBoolean("mapred.reducer.new-api", false);
1075      }
1076      /**
1077       * Set whether the framework should use the new api for the reducer. 
1078       * This is the default for jobs submitted with the new Job api.
1079       * @param flag true, if the new api should be used
1080       */
1081      public void setUseNewReducer(boolean flag) {
1082        setBoolean("mapred.reducer.new-api", flag);
1083      }
1084    
1085      /**
1086       * Get the value class for job outputs.
1087       * 
1088       * @return the value class for job outputs.
1089       */
1090      public Class<?> getOutputValueClass() {
1091        return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
1092      }
1093      
1094      /**
1095       * Set the value class for job outputs.
1096       * 
1097       * @param theClass the value class for job outputs.
1098       */
1099      public void setOutputValueClass(Class<?> theClass) {
1100        setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
1101      }
1102    
1103      /**
1104       * Get the {@link Mapper} class for the job.
1105       * 
1106       * @return the {@link Mapper} class for the job.
1107       */
1108      public Class<? extends Mapper> getMapperClass() {
1109        return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
1110      }
1111      
1112      /**
1113       * Set the {@link Mapper} class for the job.
1114       * 
1115       * @param theClass the {@link Mapper} class for the job.
1116       */
1117      public void setMapperClass(Class<? extends Mapper> theClass) {
1118        setClass("mapred.mapper.class", theClass, Mapper.class);
1119      }
1120    
1121      /**
1122       * Get the {@link MapRunnable} class for the job.
1123       * 
1124       * @return the {@link MapRunnable} class for the job.
1125       */
1126      public Class<? extends MapRunnable> getMapRunnerClass() {
1127        return getClass("mapred.map.runner.class",
1128                        MapRunner.class, MapRunnable.class);
1129      }
1130      
1131      /**
1132       * Expert: Set the {@link MapRunnable} class for the job.
1133       * 
1134       * Typically used to exert greater control on {@link Mapper}s.
1135       * 
1136       * @param theClass the {@link MapRunnable} class for the job.
1137       */
1138      public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1139        setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1140      }
1141    
1142      /**
1143       * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 
1144       * to be sent to the {@link Reducer}s.
1145       * 
1146       * @return the {@link Partitioner} used to partition map-outputs.
1147       */
1148      public Class<? extends Partitioner> getPartitionerClass() {
1149        return getClass("mapred.partitioner.class",
1150                        HashPartitioner.class, Partitioner.class);
1151      }
1152      
1153      /**
1154       * Set the {@link Partitioner} class used to partition 
1155       * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1156       * 
1157       * @param theClass the {@link Partitioner} used to partition map-outputs.
1158       */
1159      public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1160        setClass("mapred.partitioner.class", theClass, Partitioner.class);
1161      }
1162    
1163      /**
1164       * Get the {@link Reducer} class for the job.
1165       * 
1166       * @return the {@link Reducer} class for the job.
1167       */
1168      public Class<? extends Reducer> getReducerClass() {
1169        return getClass("mapred.reducer.class",
1170                        IdentityReducer.class, Reducer.class);
1171      }
1172      
1173      /**
1174       * Set the {@link Reducer} class for the job.
1175       * 
1176       * @param theClass the {@link Reducer} class for the job.
1177       */
1178      public void setReducerClass(Class<? extends Reducer> theClass) {
1179        setClass("mapred.reducer.class", theClass, Reducer.class);
1180      }
1181    
1182      /**
1183       * Get the user-defined <i>combiner</i> class used to combine map-outputs 
1184       * before being sent to the reducers. Typically the combiner is same as the
1185       * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1186       * 
1187       * @return the user-defined combiner class used to combine map-outputs.
1188       */
1189      public Class<? extends Reducer> getCombinerClass() {
1190        return getClass("mapred.combiner.class", null, Reducer.class);
1191      }
1192    
1193      /**
1194       * Set the user-defined <i>combiner</i> class used to combine map-outputs 
1195       * before being sent to the reducers. 
1196       * 
1197       * <p>The combiner is an application-specified aggregation operation, which
1198       * can help cut down the amount of data transferred between the 
1199       * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1200       * 
1201       * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1202       * the mapper and reducer tasks. In general, the combiner is called as the
1203       * sort/merge result is written to disk. The combiner must:
1204       * <ul>
1205       *   <li> be side-effect free</li>
1206       *   <li> have the same input and output key types and the same input and 
1207       *        output value types</li>
1208       * </ul></p>
1209       * 
1210       * <p>Typically the combiner is same as the <code>Reducer</code> for the  
1211       * job i.e. {@link #setReducerClass(Class)}.</p>
1212       * 
1213       * @param theClass the user-defined combiner class used to combine 
1214       *                 map-outputs.
1215       */
1216      public void setCombinerClass(Class<? extends Reducer> theClass) {
1217        setClass("mapred.combiner.class", theClass, Reducer.class);
1218      }
1219      
1220      /**
1221       * Should speculative execution be used for this job? 
1222       * Defaults to <code>true</code>.
1223       * 
1224       * @return <code>true</code> if speculative execution be used for this job,
1225       *         <code>false</code> otherwise.
1226       */
1227      public boolean getSpeculativeExecution() { 
1228        return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1229      }
1230      
1231      /**
1232       * Turn speculative execution on or off for this job. 
1233       * 
1234       * @param speculativeExecution <code>true</code> if speculative execution 
1235       *                             should be turned on, else <code>false</code>.
1236       */
1237      public void setSpeculativeExecution(boolean speculativeExecution) {
1238        setMapSpeculativeExecution(speculativeExecution);
1239        setReduceSpeculativeExecution(speculativeExecution);
1240      }
1241    
1242      /**
1243       * Should speculative execution be used for this job for map tasks? 
1244       * Defaults to <code>true</code>.
1245       * 
1246       * @return <code>true</code> if speculative execution be 
1247       *                           used for this job for map tasks,
1248       *         <code>false</code> otherwise.
1249       */
1250      public boolean getMapSpeculativeExecution() { 
1251        return getBoolean(JobContext.MAP_SPECULATIVE, true);
1252      }
1253      
1254      /**
1255       * Turn speculative execution on or off for this job for map tasks. 
1256       * 
1257       * @param speculativeExecution <code>true</code> if speculative execution 
1258       *                             should be turned on for map tasks,
1259       *                             else <code>false</code>.
1260       */
1261      public void setMapSpeculativeExecution(boolean speculativeExecution) {
1262        setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1263      }
1264    
1265      /**
1266       * Should speculative execution be used for this job for reduce tasks? 
1267       * Defaults to <code>true</code>.
1268       * 
1269       * @return <code>true</code> if speculative execution be used 
1270       *                           for reduce tasks for this job,
1271       *         <code>false</code> otherwise.
1272       */
1273      public boolean getReduceSpeculativeExecution() { 
1274        return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1275      }
1276      
1277      /**
1278       * Turn speculative execution on or off for this job for reduce tasks. 
1279       * 
1280       * @param speculativeExecution <code>true</code> if speculative execution 
1281       *                             should be turned on for reduce tasks,
1282       *                             else <code>false</code>.
1283       */
1284      public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1285        setBoolean(JobContext.REDUCE_SPECULATIVE, 
1286                   speculativeExecution);
1287      }
1288    
1289      /**
1290       * Get configured the number of reduce tasks for this job.
1291       * Defaults to <code>1</code>.
1292       * 
1293       * @return the number of reduce tasks for this job.
1294       */
1295      public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1296      
1297      /**
1298       * Set the number of map tasks for this job.
1299       * 
1300       * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 
1301       * number of spawned map tasks depends on the number of {@link InputSplit}s 
1302       * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1303       *  
1304       * A custom {@link InputFormat} is typically used to accurately control 
1305       * the number of map tasks for the job.</p>
1306       * 
1307       * <h4 id="NoOfMaps">How many maps?</h4>
1308       * 
1309       * <p>The number of maps is usually driven by the total size of the inputs 
1310       * i.e. total number of blocks of the input files.</p>
1311       *  
1312       * <p>The right level of parallelism for maps seems to be around 10-100 maps 
1313       * per-node, although it has been set up to 300 or so for very cpu-light map 
1314       * tasks. Task setup takes awhile, so it is best if the maps take at least a 
1315       * minute to execute.</p>
1316       * 
1317       * <p>The default behavior of file-based {@link InputFormat}s is to split the 
1318       * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
1319       * bytes, of input files. However, the {@link FileSystem} blocksize of the 
1320       * input files is treated as an upper bound for input splits. A lower bound 
1321       * on the split size can be set via 
1322       * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
1323       * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1324       *  
1325       * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 
1326       * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 
1327       * used to set it even higher.</p>
1328       * 
1329       * @param n the number of map tasks for this job.
1330       * @see InputFormat#getSplits(JobConf, int)
1331       * @see FileInputFormat
1332       * @see FileSystem#getDefaultBlockSize()
1333       * @see FileStatus#getBlockSize()
1334       */
1335      public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1336    
1337      /**
1338       * Get configured the number of reduce tasks for this job. Defaults to 
1339       * <code>1</code>.
1340       * 
1341       * @return the number of reduce tasks for this job.
1342       */
1343      public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1344      
1345      /**
1346       * Set the requisite number of reduce tasks for this job.
1347       * 
1348       * <h4 id="NoOfReduces">How many reduces?</h4>
1349       * 
1350       * <p>The right number of reduces seems to be <code>0.95</code> or 
1351       * <code>1.75</code> multiplied by (&lt;<i>no. of nodes</i>&gt; * 
1352       * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
1353       * mapreduce.tasktracker.reduce.tasks.maximum</a>).
1354       * </p>
1355       * 
1356       * <p>With <code>0.95</code> all of the reduces can launch immediately and 
1357       * start transfering map outputs as the maps finish. With <code>1.75</code> 
1358       * the faster nodes will finish their first round of reduces and launch a 
1359       * second wave of reduces doing a much better job of load balancing.</p>
1360       * 
1361       * <p>Increasing the number of reduces increases the framework overhead, but 
1362       * increases load balancing and lowers the cost of failures.</p>
1363       * 
1364       * <p>The scaling factors above are slightly less than whole numbers to 
1365       * reserve a few reduce slots in the framework for speculative-tasks, failures
1366       * etc.</p> 
1367       *
1368       * <h4 id="ReducerNone">Reducer NONE</h4>
1369       * 
1370       * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1371       * 
1372       * <p>In this case the output of the map-tasks directly go to distributed 
1373       * file-system, to the path set by 
1374       * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 
1375       * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1376       * 
1377       * @param n the number of reduce tasks for this job.
1378       */
1379      public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1380      
1381      /** 
1382       * Get the configured number of maximum attempts that will be made to run a
1383       * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1384       * property. If this property is not already set, the default is 4 attempts.
1385       *  
1386       * @return the max number of attempts per map task.
1387       */
1388      public int getMaxMapAttempts() {
1389        return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1390      }
1391      
1392      /** 
1393       * Expert: Set the number of maximum attempts that will be made to run a
1394       * map task.
1395       * 
1396       * @param n the number of attempts per map task.
1397       */
1398      public void setMaxMapAttempts(int n) {
1399        setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1400      }
1401    
1402      /** 
1403       * Get the configured number of maximum attempts  that will be made to run a
1404       * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1405       * property. If this property is not already set, the default is 4 attempts.
1406       * 
1407       * @return the max number of attempts per reduce task.
1408       */
1409      public int getMaxReduceAttempts() {
1410        return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1411      }
1412      /** 
1413       * Expert: Set the number of maximum attempts that will be made to run a
1414       * reduce task.
1415       * 
1416       * @param n the number of attempts per reduce task.
1417       */
1418      public void setMaxReduceAttempts(int n) {
1419        setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1420      }
1421      
1422      /**
1423       * Get the user-specified job name. This is only used to identify the 
1424       * job to the user.
1425       * 
1426       * @return the job's name, defaulting to "".
1427       */
1428      public String getJobName() {
1429        return get(JobContext.JOB_NAME, "");
1430      }
1431      
1432      /**
1433       * Set the user-specified job name.
1434       * 
1435       * @param name the job's new name.
1436       */
1437      public void setJobName(String name) {
1438        set(JobContext.JOB_NAME, name);
1439      }
1440      
1441      /**
1442       * Get the user-specified session identifier. The default is the empty string.
1443       *
1444       * The session identifier is used to tag metric data that is reported to some
1445       * performance metrics system via the org.apache.hadoop.metrics API.  The 
1446       * session identifier is intended, in particular, for use by Hadoop-On-Demand 
1447       * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 
1448       * HOD will set the session identifier by modifying the mapred-site.xml file 
1449       * before starting the cluster.
1450       *
1451       * When not running under HOD, this identifer is expected to remain set to 
1452       * the empty string.
1453       *
1454       * @return the session identifier, defaulting to "".
1455       */
1456      @Deprecated
1457      public String getSessionId() {
1458          return get("session.id", "");
1459      }
1460      
1461      /**
1462       * Set the user-specified session identifier.  
1463       *
1464       * @param sessionId the new session id.
1465       */
1466      @Deprecated
1467      public void setSessionId(String sessionId) {
1468          set("session.id", sessionId);
1469      }
1470        
1471      /**
1472       * Set the maximum no. of failures of a given job per tasktracker.
1473       * If the no. of task failures exceeds <code>noFailures</code>, the 
1474       * tasktracker is <i>blacklisted</i> for this job. 
1475       * 
1476       * @param noFailures maximum no. of failures of a given job per tasktracker.
1477       */
1478      public void setMaxTaskFailuresPerTracker(int noFailures) {
1479        setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1480      }
1481      
1482      /**
1483       * Expert: Get the maximum no. of failures of a given job per tasktracker.
1484       * If the no. of task failures exceeds this, the tasktracker is
1485       * <i>blacklisted</i> for this job. 
1486       * 
1487       * @return the maximum no. of failures of a given job per tasktracker.
1488       */
1489      public int getMaxTaskFailuresPerTracker() {
1490        return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
1491      }
1492    
1493      /**
1494       * Get the maximum percentage of map tasks that can fail without 
1495       * the job being aborted. 
1496       * 
1497       * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 
1498       * attempts before being declared as <i>failed</i>.
1499       *  
1500       * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1501       * the job being declared as {@link JobStatus#FAILED}.
1502       * 
1503       * @return the maximum percentage of map tasks that can fail without
1504       *         the job being aborted.
1505       */
1506      public int getMaxMapTaskFailuresPercent() {
1507        return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1508      }
1509    
1510      /**
1511       * Expert: Set the maximum percentage of map tasks that can fail without the
1512       * job being aborted. 
1513       * 
1514       * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 
1515       * before being declared as <i>failed</i>.
1516       * 
1517       * @param percent the maximum percentage of map tasks that can fail without 
1518       *                the job being aborted.
1519       */
1520      public void setMaxMapTaskFailuresPercent(int percent) {
1521        setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1522      }
1523      
1524      /**
1525       * Get the maximum percentage of reduce tasks that can fail without 
1526       * the job being aborted. 
1527       * 
1528       * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1529       * attempts before being declared as <i>failed</i>.
1530       * 
1531       * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 
1532       * in the job being declared as {@link JobStatus#FAILED}.
1533       * 
1534       * @return the maximum percentage of reduce tasks that can fail without
1535       *         the job being aborted.
1536       */
1537      public int getMaxReduceTaskFailuresPercent() {
1538        return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1539      }
1540      
1541      /**
1542       * Set the maximum percentage of reduce tasks that can fail without the job
1543       * being aborted.
1544       * 
1545       * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1546       * attempts before being declared as <i>failed</i>.
1547       * 
1548       * @param percent the maximum percentage of reduce tasks that can fail without 
1549       *                the job being aborted.
1550       */
1551      public void setMaxReduceTaskFailuresPercent(int percent) {
1552        setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1553      }
1554      
1555      /**
1556       * Set {@link JobPriority} for this job.
1557       * 
1558       * @param prio the {@link JobPriority} for this job.
1559       */
1560      public void setJobPriority(JobPriority prio) {
1561        set(JobContext.PRIORITY, prio.toString());
1562      }
1563      
1564      /**
1565       * Get the {@link JobPriority} for this job.
1566       * 
1567       * @return the {@link JobPriority} for this job.
1568       */
1569      public JobPriority getJobPriority() {
1570        String prio = get(JobContext.PRIORITY);
1571        if(prio == null) {
1572          return JobPriority.NORMAL;
1573        }
1574        
1575        return JobPriority.valueOf(prio);
1576      }
1577    
1578      /**
1579       * Set JobSubmitHostName for this job.
1580       * 
1581       * @param hostname the JobSubmitHostName for this job.
1582       */
1583      void setJobSubmitHostName(String hostname) {
1584        set(MRJobConfig.JOB_SUBMITHOST, hostname);
1585      }
1586      
1587      /**
1588       * Get the  JobSubmitHostName for this job.
1589       * 
1590       * @return the JobSubmitHostName for this job.
1591       */
1592      String getJobSubmitHostName() {
1593        String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1594        
1595        return hostname;
1596      }
1597    
1598      /**
1599       * Set JobSubmitHostAddress for this job.
1600       * 
1601       * @param hostadd the JobSubmitHostAddress for this job.
1602       */
1603      void setJobSubmitHostAddress(String hostadd) {
1604        set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1605      }
1606      
1607      /**
1608       * Get JobSubmitHostAddress for this job.
1609       * 
1610       * @return  JobSubmitHostAddress for this job.
1611       */
1612      String getJobSubmitHostAddress() {
1613        String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1614        
1615        return hostadd;
1616      }
1617    
1618      /**
1619       * Get whether the task profiling is enabled.
1620       * @return true if some tasks will be profiled
1621       */
1622      public boolean getProfileEnabled() {
1623        return getBoolean(JobContext.TASK_PROFILE, false);
1624      }
1625    
1626      /**
1627       * Set whether the system should collect profiler information for some of 
1628       * the tasks in this job? The information is stored in the user log 
1629       * directory.
1630       * @param newValue true means it should be gathered
1631       */
1632      public void setProfileEnabled(boolean newValue) {
1633        setBoolean(JobContext.TASK_PROFILE, newValue);
1634      }
1635    
1636      /**
1637       * Get the profiler configuration arguments.
1638       *
1639       * The default value for this property is
1640       * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1641       * 
1642       * @return the parameters to pass to the task child to configure profiling
1643       */
1644      public String getProfileParams() {
1645        return get(JobContext.TASK_PROFILE_PARAMS,
1646            MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
1647      }
1648    
1649      /**
1650       * Set the profiler configuration arguments. If the string contains a '%s' it
1651       * will be replaced with the name of the profiling output file when the task
1652       * runs.
1653       *
1654       * This value is passed to the task child JVM on the command line.
1655       *
1656       * @param value the configuration string
1657       */
1658      public void setProfileParams(String value) {
1659        set(JobContext.TASK_PROFILE_PARAMS, value);
1660      }
1661    
1662      /**
1663       * Get the range of maps or reduces to profile.
1664       * @param isMap is the task a map?
1665       * @return the task ranges
1666       */
1667      public IntegerRanges getProfileTaskRange(boolean isMap) {
1668        return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 
1669                           JobContext.NUM_REDUCE_PROFILES), "0-2");
1670      }
1671    
1672      /**
1673       * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1674       * must also be called.
1675       * @param newValue a set of integer ranges of the map ids
1676       */
1677      public void setProfileTaskRange(boolean isMap, String newValue) {
1678        // parse the value to make sure it is legal
1679          new Configuration.IntegerRanges(newValue);
1680        set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 
1681              newValue);
1682      }
1683    
1684      /**
1685       * Set the debug script to run when the map tasks fail.
1686       * 
1687       * <p>The debug script can aid debugging of failed map tasks. The script is 
1688       * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1689       * 
1690       * <p>The debug command, run on the node where the map failed, is:</p>
1691       * <p><pre><blockquote> 
1692       * $script $stdout $stderr $syslog $jobconf.
1693       * </blockquote></pre></p>
1694       * 
1695       * <p> The script file is distributed through {@link DistributedCache} 
1696       * APIs. The script needs to be symlinked. </p>
1697       * 
1698       * <p>Here is an example on how to submit a script 
1699       * <p><blockquote><pre>
1700       * job.setMapDebugScript("./myscript");
1701       * DistributedCache.createSymlink(job);
1702       * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1703       * </pre></blockquote></p>
1704       * 
1705       * @param mDbgScript the script name
1706       */
1707      public void  setMapDebugScript(String mDbgScript) {
1708        set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1709      }
1710      
1711      /**
1712       * Get the map task's debug script.
1713       * 
1714       * @return the debug Script for the mapred job for failed map tasks.
1715       * @see #setMapDebugScript(String)
1716       */
1717      public String getMapDebugScript() {
1718        return get(JobContext.MAP_DEBUG_SCRIPT);
1719      }
1720      
1721      /**
1722       * Set the debug script to run when the reduce tasks fail.
1723       * 
1724       * <p>The debug script can aid debugging of failed reduce tasks. The script
1725       * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1726       * 
1727       * <p>The debug command, run on the node where the map failed, is:</p>
1728       * <p><pre><blockquote> 
1729       * $script $stdout $stderr $syslog $jobconf.
1730       * </blockquote></pre></p>
1731       * 
1732       * <p> The script file is distributed through {@link DistributedCache} 
1733       * APIs. The script file needs to be symlinked </p>
1734       * 
1735       * <p>Here is an example on how to submit a script 
1736       * <p><blockquote><pre>
1737       * job.setReduceDebugScript("./myscript");
1738       * DistributedCache.createSymlink(job);
1739       * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1740       * </pre></blockquote></p>
1741       * 
1742       * @param rDbgScript the script name
1743       */
1744      public void  setReduceDebugScript(String rDbgScript) {
1745        set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1746      }
1747      
1748      /**
1749       * Get the reduce task's debug Script
1750       * 
1751       * @return the debug script for the mapred job for failed reduce tasks.
1752       * @see #setReduceDebugScript(String)
1753       */
1754      public String getReduceDebugScript() {
1755        return get(JobContext.REDUCE_DEBUG_SCRIPT);
1756      }
1757    
1758      /**
1759       * Get the uri to be invoked in-order to send a notification after the job 
1760       * has completed (success/failure). 
1761       * 
1762       * @return the job end notification uri, <code>null</code> if it hasn't
1763       *         been set.
1764       * @see #setJobEndNotificationURI(String)
1765       */
1766      public String getJobEndNotificationURI() {
1767        return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1768      }
1769    
1770      /**
1771       * Set the uri to be invoked in-order to send a notification after the job
1772       * has completed (success/failure).
1773       * 
1774       * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 
1775       * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 
1776       * identifier and completion-status respectively.</p>
1777       * 
1778       * <p>This is typically used by application-writers to implement chaining of 
1779       * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1780       * 
1781       * @param uri the job end notification uri
1782       * @see JobStatus
1783       * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#
1784       *       JobCompletionAndChaining">Job Completion and Chaining</a>
1785       */
1786      public void setJobEndNotificationURI(String uri) {
1787        set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1788      }
1789    
1790      /**
1791       * Get job-specific shared directory for use as scratch space
1792       * 
1793       * <p>
1794       * When a job starts, a shared directory is created at location
1795       * <code>
1796       * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1797       * This directory is exposed to the users through 
1798       * <code>mapreduce.job.local.dir </code>.
1799       * So, the tasks can use this space 
1800       * as scratch space and share files among them. </p>
1801       * This value is available as System property also.
1802       * 
1803       * @return The localized job specific shared directory
1804       */
1805      public String getJobLocalDir() {
1806        return get(JobContext.JOB_LOCAL_DIR);
1807      }
1808    
1809      /**
1810       * Get memory required to run a map task of the job, in MB.
1811       * 
1812       * If a value is specified in the configuration, it is returned.
1813       * Else, it returns {@link JobContext#DEFAULT_MAP_MEMORY_MB}.
1814       * <p/>
1815       * For backward compatibility, if the job configuration sets the
1816       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1817       * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1818       * after converting it from bytes to MB.
1819       * @return memory required to run a map task of the job, in MB,
1820       */
1821      public long getMemoryForMapTask() {
1822        long value = getDeprecatedMemoryValue();
1823        if (value < 0) {
1824          return getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
1825              JobContext.DEFAULT_MAP_MEMORY_MB);
1826        }
1827        return value;
1828      }
1829    
1830      public void setMemoryForMapTask(long mem) {
1831        setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1832        // In case that M/R 1.x applications use the old property name
1833        setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1834      }
1835    
1836      /**
1837       * Get memory required to run a reduce task of the job, in MB.
1838       * 
1839       * If a value is specified in the configuration, it is returned.
1840       * Else, it returns {@link JobContext#DEFAULT_REDUCE_MEMORY_MB}.
1841       * <p/>
1842       * For backward compatibility, if the job configuration sets the
1843       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1844       * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1845       * after converting it from bytes to MB.
1846       * @return memory required to run a reduce task of the job, in MB.
1847       */
1848      public long getMemoryForReduceTask() {
1849        long value = getDeprecatedMemoryValue();
1850        if (value < 0) {
1851          return getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
1852              JobContext.DEFAULT_REDUCE_MEMORY_MB);
1853        }
1854        return value;
1855      }
1856      
1857      // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1858      // converted into MBs.
1859      // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1860      // value.
1861      private long getDeprecatedMemoryValue() {
1862        long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 
1863            DISABLED_MEMORY_LIMIT);
1864        if (oldValue > 0) {
1865          oldValue /= (1024*1024);
1866        }
1867        return oldValue;
1868      }
1869    
1870      public void setMemoryForReduceTask(long mem) {
1871        setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1872        // In case that M/R 1.x applications use the old property name
1873        setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1874      }
1875    
1876      /**
1877       * Return the name of the queue to which this job is submitted.
1878       * Defaults to 'default'.
1879       * 
1880       * @return name of the queue
1881       */
1882      public String getQueueName() {
1883        return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1884      }
1885      
1886      /**
1887       * Set the name of the queue to which this job should be submitted.
1888       * 
1889       * @param queueName Name of the queue
1890       */
1891      public void setQueueName(String queueName) {
1892        set(JobContext.QUEUE_NAME, queueName);
1893      }
1894      
1895      /**
1896       * Normalize the negative values in configuration
1897       * 
1898       * @param val
1899       * @return normalized value
1900       */
1901      public static long normalizeMemoryConfigValue(long val) {
1902        if (val < 0) {
1903          val = DISABLED_MEMORY_LIMIT;
1904        }
1905        return val;
1906      }
1907    
1908      /** 
1909       * Find a jar that contains a class of the same name, if any.
1910       * It will return a jar file, even if that is not the first thing
1911       * on the class path that has a class with the same name.
1912       * 
1913       * @param my_class the class to find.
1914       * @return a jar file that contains the class, or null.
1915       * @throws IOException
1916       */
1917      public static String findContainingJar(Class my_class) {
1918        return ClassUtil.findContainingJar(my_class);
1919      }
1920    
1921      /**
1922       * Get the memory required to run a task of this job, in bytes. See
1923       * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1924       * <p/>
1925       * This method is deprecated. Now, different memory limits can be
1926       * set for map and reduce tasks of a job, in MB. 
1927       * <p/>
1928       * For backward compatibility, if the job configuration sets the
1929       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY}, that value is returned. 
1930       * Otherwise, this method will return the larger of the values returned by 
1931       * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
1932       * after converting them into bytes.
1933       *
1934       * @return Memory required to run a task of this job, in bytes.
1935       * @see #setMaxVirtualMemoryForTask(long)
1936       * @deprecated Use {@link #getMemoryForMapTask()} and
1937       *             {@link #getMemoryForReduceTask()}
1938       */
1939      @Deprecated
1940      public long getMaxVirtualMemoryForTask() {
1941        LOG.warn(
1942          "getMaxVirtualMemoryForTask() is deprecated. " +
1943          "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
1944    
1945        long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
1946            Math.max(getMemoryForMapTask(), getMemoryForReduceTask()) * 1024 * 1024);
1947        return value;
1948      }
1949    
1950      /**
1951       * Set the maximum amount of memory any task of this job can use. See
1952       * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1953       * <p/>
1954       * mapred.task.maxvmem is split into
1955       * mapreduce.map.memory.mb
1956       * and mapreduce.map.memory.mb,mapred
1957       * each of the new key are set
1958       * as mapred.task.maxvmem / 1024
1959       * as new values are in MB
1960       *
1961       * @param vmem Maximum amount of virtual memory in bytes any task of this job
1962       *             can use.
1963       * @see #getMaxVirtualMemoryForTask()
1964       * @deprecated
1965       *  Use {@link #setMemoryForMapTask(long mem)}  and
1966       *  Use {@link #setMemoryForReduceTask(long mem)}
1967       */
1968      @Deprecated
1969      public void setMaxVirtualMemoryForTask(long vmem) {
1970        LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
1971          "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
1972        if (vmem < 0) {
1973          throw new IllegalArgumentException("Task memory allocation may not be < 0");
1974        }
1975    
1976        if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
1977          setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
1978          setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
1979        }else{
1980          this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
1981        }
1982      }
1983    
1984      /**
1985       * @deprecated this variable is deprecated and nolonger in use.
1986       */
1987      @Deprecated
1988      public long getMaxPhysicalMemoryForTask() {
1989        LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
1990                  + " Refer to the APIs getMemoryForMapTask() and"
1991                  + " getMemoryForReduceTask() for details.");
1992        return -1;
1993      }
1994    
1995      /*
1996       * @deprecated this
1997       */
1998      @Deprecated
1999      public void setMaxPhysicalMemoryForTask(long mem) {
2000        LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
2001            + " The value set is ignored. Refer to "
2002            + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
2003      }
2004    
2005      static String deprecatedString(String key) {
2006        return "The variable " + key + " is no longer used.";
2007      }
2008    
2009      private void checkAndWarnDeprecation() {
2010        if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
2011          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
2012                    + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY
2013                    + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY);
2014        }
2015        if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
2016          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
2017        }
2018        if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
2019          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
2020        }
2021        if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
2022          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
2023        }
2024      }
2025      
2026    
2027    }
2028