001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    
022    import java.io.IOException;
023    import java.util.regex.Pattern;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceAudience.Private;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.fs.FileStatus;
032    import org.apache.hadoop.fs.FileSystem;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.io.LongWritable;
035    import org.apache.hadoop.io.RawComparator;
036    import org.apache.hadoop.io.Text;
037    import org.apache.hadoop.io.WritableComparable;
038    import org.apache.hadoop.io.WritableComparator;
039    import org.apache.hadoop.io.compress.CompressionCodec;
040    import org.apache.hadoop.mapred.lib.HashPartitioner;
041    import org.apache.hadoop.mapred.lib.IdentityMapper;
042    import org.apache.hadoop.mapred.lib.IdentityReducer;
043    import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
044    import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
045    import org.apache.hadoop.mapreduce.MRConfig;
046    import org.apache.hadoop.mapreduce.MRJobConfig;
047    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
048    import org.apache.hadoop.mapreduce.util.ConfigUtil;
049    import org.apache.hadoop.security.Credentials;
050    import org.apache.hadoop.util.ClassUtil;
051    import org.apache.hadoop.util.ReflectionUtils;
052    import org.apache.hadoop.util.Tool;
053    import org.apache.log4j.Level;
054    
055    /** 
056     * A map/reduce job configuration.
057     * 
058     * <p><code>JobConf</code> is the primary interface for a user to describe a 
059     * map-reduce job to the Hadoop framework for execution. The framework tries to
060     * faithfully execute the job as-is described by <code>JobConf</code>, however:
061     * <ol>
062     *   <li>
063     *   Some configuration parameters might have been marked as 
064     *   <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
065     *   final</a> by administrators and hence cannot be altered.
066     *   </li>
067     *   <li>
068     *   While some job parameters are straight-forward to set 
069     *   (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 
070     *   rest of the framework and/or job-configuration and is relatively more 
071     *   complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}).
072     *   </li>
073     * </ol></p>
074     * 
075     * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 
076     * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 
077     * {@link OutputFormat} implementations to be used etc.
078     *
079     * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 
080     * of the job such as <code>Comparator</code>s to be used, files to be put in  
081     * the {@link DistributedCache}, whether or not intermediate and/or job outputs 
082     * are to be compressed (and how), debugability via user-provided scripts 
083     * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
084     * for doing post-processing on task logs, task's stdout, stderr, syslog. 
085     * and etc.</p>
086     * 
087     * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
088     * <p><blockquote><pre>
089     *     // Create a new JobConf
090     *     JobConf job = new JobConf(new Configuration(), MyJob.class);
091     *     
092     *     // Specify various job-specific parameters     
093     *     job.setJobName("myjob");
094     *     
095     *     FileInputFormat.setInputPaths(job, new Path("in"));
096     *     FileOutputFormat.setOutputPath(job, new Path("out"));
097     *     
098     *     job.setMapperClass(MyJob.MyMapper.class);
099     *     job.setCombinerClass(MyJob.MyReducer.class);
100     *     job.setReducerClass(MyJob.MyReducer.class);
101     *     
102     *     job.setInputFormat(SequenceFileInputFormat.class);
103     *     job.setOutputFormat(SequenceFileOutputFormat.class);
104     * </pre></blockquote></p>
105     * 
106     * @see JobClient
107     * @see ClusterStatus
108     * @see Tool
109     * @see DistributedCache
110     */
111    @InterfaceAudience.Public
112    @InterfaceStability.Stable
113    public class JobConf extends Configuration {
114      
115      private static final Log LOG = LogFactory.getLog(JobConf.class);
116    
117      static{
118        ConfigUtil.loadResources();
119      }
120    
121      /**
122       * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and
123       * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
124       */
125      @Deprecated
126      public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
127        "mapred.task.maxvmem";
128    
129      /**
130       * @deprecated 
131       */
132      @Deprecated
133      public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
134        "mapred.task.limit.maxvmem";
135    
136      /**
137       * @deprecated
138       */
139      @Deprecated
140      public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
141        "mapred.task.default.maxvmem";
142    
143      /**
144       * @deprecated
145       */
146      @Deprecated
147      public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
148        "mapred.task.maxpmem";
149    
150      /**
151       * A value which if set for memory related configuration options,
152       * indicates that the options are turned off.
153       */
154      public static final long DISABLED_MEMORY_LIMIT = -1L;
155    
156      /**
157       * Property name for the configuration property mapreduce.cluster.local.dir
158       */
159      public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
160    
161      /**
162       * Name of the queue to which jobs will be submitted, if no queue
163       * name is mentioned.
164       */
165      public static final String DEFAULT_QUEUE_NAME = "default";
166    
167      static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY =
168          JobContext.MAP_MEMORY_MB;
169    
170      static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY =
171        JobContext.REDUCE_MEMORY_MB;
172    
173      /**
174       * The variable is kept for M/R 1.x applications, while M/R 2.x applications
175       * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY}
176       */
177      @Deprecated
178      public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
179          "mapred.job.map.memory.mb";
180    
181      /**
182       * The variable is kept for M/R 1.x applications, while M/R 2.x applications
183       * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
184       */
185      @Deprecated
186      public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
187          "mapred.job.reduce.memory.mb";
188    
189      /** Pattern for the default unpacking behavior for job jars */
190      public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
191        Pattern.compile("(?:classes/|lib/).*");
192    
193      /**
194       * Configuration key to set the java command line options for the child
195       * map and reduce tasks.
196       * 
197       * Java opts for the task tracker child processes.
198       * The following symbol, if present, will be interpolated: @taskid@. 
199       * It is replaced by current TaskID. Any other occurrences of '@' will go 
200       * unchanged.
201       * For example, to enable verbose gc logging to a file named for the taskid in
202       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
203       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
204       * 
205       * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 
206       * other environment variables to the child processes.
207       * 
208       * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 
209       *                 {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
210       */
211      @Deprecated
212      public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
213      
214      /**
215       * Configuration key to set the java command line options for the map tasks.
216       * 
217       * Java opts for the task tracker child map processes.
218       * The following symbol, if present, will be interpolated: @taskid@. 
219       * It is replaced by current TaskID. Any other occurrences of '@' will go 
220       * unchanged.
221       * For example, to enable verbose gc logging to a file named for the taskid in
222       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
223       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
224       * 
225       * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 
226       * other environment variables to the map processes.
227       */
228      public static final String MAPRED_MAP_TASK_JAVA_OPTS = 
229        JobContext.MAP_JAVA_OPTS;
230      
231      /**
232       * Configuration key to set the java command line options for the reduce tasks.
233       * 
234       * Java opts for the task tracker child reduce processes.
235       * The following symbol, if present, will be interpolated: @taskid@. 
236       * It is replaced by current TaskID. Any other occurrences of '@' will go 
237       * unchanged.
238       * For example, to enable verbose gc logging to a file named for the taskid in
239       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
240       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
241       * 
242       * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 
243       * pass process environment variables to the reduce processes.
244       */
245      public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
246        JobContext.REDUCE_JAVA_OPTS;
247      
248      public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
249      
250      /**
251       * @deprecated
252       * Configuration key to set the maximum virtual memory available to the child
253       * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
254       * longer have any effect.
255       */
256      @Deprecated
257      public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
258    
259      /**
260       * @deprecated
261       * Configuration key to set the maximum virtual memory available to the
262       * map tasks (in kilo-bytes). This has been deprecated and will no
263       * longer have any effect.
264       */
265      @Deprecated
266      public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
267      
268      /**
269       * @deprecated
270       * Configuration key to set the maximum virtual memory available to the
271       * reduce tasks (in kilo-bytes). This has been deprecated and will no
272       * longer have any effect.
273       */
274      @Deprecated
275      public static final String MAPRED_REDUCE_TASK_ULIMIT =
276        "mapreduce.reduce.ulimit";
277    
278    
279      /**
280       * Configuration key to set the environment of the child map/reduce tasks.
281       * 
282       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
283       * reference existing environment variables via <code>$key</code> on
284       * Linux or <code>%key%</code> on Windows.
285       * 
286       * Example:
287       * <ul>
288       *   <li> A=foo - This will set the env variable A to foo. </li>
289       *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
290       *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
291       * </ul>
292       * 
293       * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 
294       *                 {@link #MAPRED_REDUCE_TASK_ENV}
295       */
296      @Deprecated
297      public static final String MAPRED_TASK_ENV = "mapred.child.env";
298    
299      /**
300       * Configuration key to set the environment of the child map tasks.
301       * 
302       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
303       * reference existing environment variables via <code>$key</code> on
304       * Linux or <code>%key%</code> on Windows.
305       * 
306       * Example:
307       * <ul>
308       *   <li> A=foo - This will set the env variable A to foo. </li>
309       *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
310       *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
311       * </ul>
312       */
313      public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
314      
315      /**
316       * Configuration key to set the environment of the child reduce tasks.
317       * 
318       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
319       * reference existing environment variables via <code>$key</code> on
320       * Linux or <code>%key%</code> on Windows.
321       * 
322       * Example:
323       * <ul>
324       *   <li> A=foo - This will set the env variable A to foo. </li>
325       *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
326       *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
327       * </ul>
328       */
329      public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
330    
331      private Credentials credentials = new Credentials();
332      
333      /**
334       * Configuration key to set the logging {@link Level} for the map task.
335       *
336       * The allowed logging levels are:
337       * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
338       */
339      public static final String MAPRED_MAP_TASK_LOG_LEVEL = 
340        JobContext.MAP_LOG_LEVEL;
341      
342      /**
343       * Configuration key to set the logging {@link Level} for the reduce task.
344       *
345       * The allowed logging levels are:
346       * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
347       */
348      public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 
349        JobContext.REDUCE_LOG_LEVEL;
350      
351      /**
352       * Default logging level for map/reduce tasks.
353       */
354      public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
355    
356      /**
357       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
358       * use {@link MRJobConfig#WORKFLOW_ID} instead
359       */
360      @Deprecated
361      public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID;
362    
363      /**
364       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
365       * use {@link MRJobConfig#WORKFLOW_NAME} instead
366       */
367      @Deprecated
368      public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME;
369    
370      /**
371       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
372       * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead
373       */
374      @Deprecated
375      public static final String WORKFLOW_NODE_NAME =
376          MRJobConfig.WORKFLOW_NODE_NAME;
377    
378      /**
379       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
380       * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead
381       */
382      @Deprecated
383      public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
384          MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING;
385    
386      /**
387       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
388       * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead
389       */
390      @Deprecated
391      public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
392          MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN;
393    
394      /**
395       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
396       * use {@link MRJobConfig#WORKFLOW_TAGS} instead
397       */
398      @Deprecated
399      public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS;
400    
401      /**
402       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
403       * not use it
404       */
405      @Deprecated
406      public static final String MAPREDUCE_RECOVER_JOB =
407          "mapreduce.job.restart.recover";
408    
409      /**
410       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
411       * not use it
412       */
413      @Deprecated
414      public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true;
415    
416      /**
417       * Construct a map/reduce job configuration.
418       */
419      public JobConf() {
420        checkAndWarnDeprecation();
421      }
422    
423      /** 
424       * Construct a map/reduce job configuration.
425       * 
426       * @param exampleClass a class whose containing jar is used as the job's jar.
427       */
428      public JobConf(Class exampleClass) {
429        setJarByClass(exampleClass);
430        checkAndWarnDeprecation();
431      }
432      
433      /**
434       * Construct a map/reduce job configuration.
435       * 
436       * @param conf a Configuration whose settings will be inherited.
437       */
438      public JobConf(Configuration conf) {
439        super(conf);
440        
441        if (conf instanceof JobConf) {
442          JobConf that = (JobConf)conf;
443          credentials = that.credentials;
444        }
445        
446        checkAndWarnDeprecation();
447      }
448    
449    
450      /** Construct a map/reduce job configuration.
451       * 
452       * @param conf a Configuration whose settings will be inherited.
453       * @param exampleClass a class whose containing jar is used as the job's jar.
454       */
455      public JobConf(Configuration conf, Class exampleClass) {
456        this(conf);
457        setJarByClass(exampleClass);
458      }
459    
460    
461      /** Construct a map/reduce configuration.
462       *
463       * @param config a Configuration-format XML job description file.
464       */
465      public JobConf(String config) {
466        this(new Path(config));
467      }
468    
469      /** Construct a map/reduce configuration.
470       *
471       * @param config a Configuration-format XML job description file.
472       */
473      public JobConf(Path config) {
474        super();
475        addResource(config);
476        checkAndWarnDeprecation();
477      }
478    
479      /** A new map/reduce configuration where the behavior of reading from the
480       * default resources can be turned off.
481       * <p/>
482       * If the parameter {@code loadDefaults} is false, the new instance
483       * will not load resources from the default files.
484       *
485       * @param loadDefaults specifies whether to load from the default files
486       */
487      public JobConf(boolean loadDefaults) {
488        super(loadDefaults);
489        checkAndWarnDeprecation();
490      }
491    
492      /**
493       * Get credentials for the job.
494       * @return credentials for the job
495       */
496      public Credentials getCredentials() {
497        return credentials;
498      }
499      
500      @Private
501      public void setCredentials(Credentials credentials) {
502        this.credentials = credentials;
503      }
504      
505      /**
506       * Get the user jar for the map-reduce job.
507       * 
508       * @return the user jar for the map-reduce job.
509       */
510      public String getJar() { return get(JobContext.JAR); }
511      
512      /**
513       * Set the user jar for the map-reduce job.
514       * 
515       * @param jar the user jar for the map-reduce job.
516       */
517      public void setJar(String jar) { set(JobContext.JAR, jar); }
518    
519      /**
520       * Get the pattern for jar contents to unpack on the tasktracker
521       */
522      public Pattern getJarUnpackPattern() {
523        return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
524      }
525    
526      
527      /**
528       * Set the job's jar file by finding an example class location.
529       * 
530       * @param cls the example class.
531       */
532      public void setJarByClass(Class cls) {
533        String jar = ClassUtil.findContainingJar(cls);
534        if (jar != null) {
535          setJar(jar);
536        }   
537      }
538    
539      public String[] getLocalDirs() throws IOException {
540        return getTrimmedStrings(MRConfig.LOCAL_DIR);
541      }
542    
543      /**
544       * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
545       */
546      @Deprecated
547      public void deleteLocalFiles() throws IOException {
548        String[] localDirs = getLocalDirs();
549        for (int i = 0; i < localDirs.length; i++) {
550          FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
551        }
552      }
553    
554      public void deleteLocalFiles(String subdir) throws IOException {
555        String[] localDirs = getLocalDirs();
556        for (int i = 0; i < localDirs.length; i++) {
557          FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
558        }
559      }
560    
561      /** 
562       * Constructs a local file name. Files are distributed among configured
563       * local directories.
564       */
565      public Path getLocalPath(String pathString) throws IOException {
566        return getLocalPath(MRConfig.LOCAL_DIR, pathString);
567      }
568    
569      /**
570       * Get the reported username for this job.
571       * 
572       * @return the username
573       */
574      public String getUser() {
575        return get(JobContext.USER_NAME);
576      }
577      
578      /**
579       * Set the reported username for this job.
580       * 
581       * @param user the username for this job.
582       */
583      public void setUser(String user) {
584        set(JobContext.USER_NAME, user);
585      }
586    
587    
588      
589      /**
590       * Set whether the framework should keep the intermediate files for 
591       * failed tasks.
592       * 
593       * @param keep <code>true</code> if framework should keep the intermediate files 
594       *             for failed tasks, <code>false</code> otherwise.
595       * 
596       */
597      public void setKeepFailedTaskFiles(boolean keep) {
598        setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
599      }
600      
601      /**
602       * Should the temporary files for failed tasks be kept?
603       * 
604       * @return should the files be kept?
605       */
606      public boolean getKeepFailedTaskFiles() {
607        return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
608      }
609      
610      /**
611       * Set a regular expression for task names that should be kept. 
612       * The regular expression ".*_m_000123_0" would keep the files
613       * for the first instance of map 123 that ran.
614       * 
615       * @param pattern the java.util.regex.Pattern to match against the 
616       *        task names.
617       */
618      public void setKeepTaskFilesPattern(String pattern) {
619        set(JobContext.PRESERVE_FILES_PATTERN, pattern);
620      }
621      
622      /**
623       * Get the regular expression that is matched against the task names
624       * to see if we need to keep the files.
625       * 
626       * @return the pattern as a string, if it was set, othewise null.
627       */
628      public String getKeepTaskFilesPattern() {
629        return get(JobContext.PRESERVE_FILES_PATTERN);
630      }
631      
632      /**
633       * Set the current working directory for the default file system.
634       * 
635       * @param dir the new current working directory.
636       */
637      public void setWorkingDirectory(Path dir) {
638        dir = new Path(getWorkingDirectory(), dir);
639        set(JobContext.WORKING_DIR, dir.toString());
640      }
641      
642      /**
643       * Get the current working directory for the default file system.
644       * 
645       * @return the directory name.
646       */
647      public Path getWorkingDirectory() {
648        String name = get(JobContext.WORKING_DIR);
649        if (name != null) {
650          return new Path(name);
651        } else {
652          try {
653            Path dir = FileSystem.get(this).getWorkingDirectory();
654            set(JobContext.WORKING_DIR, dir.toString());
655            return dir;
656          } catch (IOException e) {
657            throw new RuntimeException(e);
658          }
659        }
660      }
661      
662      /**
663       * Sets the number of tasks that a spawned task JVM should run
664       * before it exits
665       * @param numTasks the number of tasks to execute; defaults to 1;
666       * -1 signifies no limit
667       */
668      public void setNumTasksToExecutePerJvm(int numTasks) {
669        setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
670      }
671      
672      /**
673       * Get the number of tasks that a spawned JVM should execute
674       */
675      public int getNumTasksToExecutePerJvm() {
676        return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
677      }
678      
679      /**
680       * Get the {@link InputFormat} implementation for the map-reduce job,
681       * defaults to {@link TextInputFormat} if not specified explicity.
682       * 
683       * @return the {@link InputFormat} implementation for the map-reduce job.
684       */
685      public InputFormat getInputFormat() {
686        return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
687                                                                 TextInputFormat.class,
688                                                                 InputFormat.class),
689                                                        this);
690      }
691      
692      /**
693       * Set the {@link InputFormat} implementation for the map-reduce job.
694       * 
695       * @param theClass the {@link InputFormat} implementation for the map-reduce 
696       *                 job.
697       */
698      public void setInputFormat(Class<? extends InputFormat> theClass) {
699        setClass("mapred.input.format.class", theClass, InputFormat.class);
700      }
701      
702      /**
703       * Get the {@link OutputFormat} implementation for the map-reduce job,
704       * defaults to {@link TextOutputFormat} if not specified explicity.
705       * 
706       * @return the {@link OutputFormat} implementation for the map-reduce job.
707       */
708      public OutputFormat getOutputFormat() {
709        return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
710                                                                  TextOutputFormat.class,
711                                                                  OutputFormat.class),
712                                                         this);
713      }
714    
715      /**
716       * Get the {@link OutputCommitter} implementation for the map-reduce job,
717       * defaults to {@link FileOutputCommitter} if not specified explicitly.
718       * 
719       * @return the {@link OutputCommitter} implementation for the map-reduce job.
720       */
721      public OutputCommitter getOutputCommitter() {
722        return (OutputCommitter)ReflectionUtils.newInstance(
723          getClass("mapred.output.committer.class", FileOutputCommitter.class,
724                   OutputCommitter.class), this);
725      }
726    
727      /**
728       * Set the {@link OutputCommitter} implementation for the map-reduce job.
729       * 
730       * @param theClass the {@link OutputCommitter} implementation for the map-reduce 
731       *                 job.
732       */
733      public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
734        setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
735      }
736      
737      /**
738       * Set the {@link OutputFormat} implementation for the map-reduce job.
739       * 
740       * @param theClass the {@link OutputFormat} implementation for the map-reduce 
741       *                 job.
742       */
743      public void setOutputFormat(Class<? extends OutputFormat> theClass) {
744        setClass("mapred.output.format.class", theClass, OutputFormat.class);
745      }
746    
747      /**
748       * Should the map outputs be compressed before transfer?
749       * 
750       * @param compress should the map outputs be compressed?
751       */
752      public void setCompressMapOutput(boolean compress) {
753        setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
754      }
755      
756      /**
757       * Are the outputs of the maps be compressed?
758       * 
759       * @return <code>true</code> if the outputs of the maps are to be compressed,
760       *         <code>false</code> otherwise.
761       */
762      public boolean getCompressMapOutput() {
763        return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
764      }
765    
766      /**
767       * Set the given class as the  {@link CompressionCodec} for the map outputs.
768       * 
769       * @param codecClass the {@link CompressionCodec} class that will compress  
770       *                   the map outputs.
771       */
772      public void 
773      setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
774        setCompressMapOutput(true);
775        setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 
776                 CompressionCodec.class);
777      }
778      
779      /**
780       * Get the {@link CompressionCodec} for compressing the map outputs.
781       * 
782       * @param defaultValue the {@link CompressionCodec} to return if not set
783       * @return the {@link CompressionCodec} class that should be used to compress the 
784       *         map outputs.
785       * @throws IllegalArgumentException if the class was specified, but not found
786       */
787      public Class<? extends CompressionCodec> 
788      getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
789        Class<? extends CompressionCodec> codecClass = defaultValue;
790        String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
791        if (name != null) {
792          try {
793            codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
794          } catch (ClassNotFoundException e) {
795            throw new IllegalArgumentException("Compression codec " + name + 
796                                               " was not found.", e);
797          }
798        }
799        return codecClass;
800      }
801      
802      /**
803       * Get the key class for the map output data. If it is not set, use the
804       * (final) output key class. This allows the map output key class to be
805       * different than the final output key class.
806       *  
807       * @return the map output key class.
808       */
809      public Class<?> getMapOutputKeyClass() {
810        Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
811        if (retv == null) {
812          retv = getOutputKeyClass();
813        }
814        return retv;
815      }
816      
817      /**
818       * Set the key class for the map output data. This allows the user to
819       * specify the map output key class to be different than the final output
820       * value class.
821       * 
822       * @param theClass the map output key class.
823       */
824      public void setMapOutputKeyClass(Class<?> theClass) {
825        setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
826      }
827      
828      /**
829       * Get the value class for the map output data. If it is not set, use the
830       * (final) output value class This allows the map output value class to be
831       * different than the final output value class.
832       *  
833       * @return the map output value class.
834       */
835      public Class<?> getMapOutputValueClass() {
836        Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
837            Object.class);
838        if (retv == null) {
839          retv = getOutputValueClass();
840        }
841        return retv;
842      }
843      
844      /**
845       * Set the value class for the map output data. This allows the user to
846       * specify the map output value class to be different than the final output
847       * value class.
848       * 
849       * @param theClass the map output value class.
850       */
851      public void setMapOutputValueClass(Class<?> theClass) {
852        setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
853      }
854      
855      /**
856       * Get the key class for the job output data.
857       * 
858       * @return the key class for the job output data.
859       */
860      public Class<?> getOutputKeyClass() {
861        return getClass(JobContext.OUTPUT_KEY_CLASS,
862                        LongWritable.class, Object.class);
863      }
864      
865      /**
866       * Set the key class for the job output data.
867       * 
868       * @param theClass the key class for the job output data.
869       */
870      public void setOutputKeyClass(Class<?> theClass) {
871        setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
872      }
873    
874      /**
875       * Get the {@link RawComparator} comparator used to compare keys.
876       * 
877       * @return the {@link RawComparator} comparator used to compare keys.
878       */
879      public RawComparator getOutputKeyComparator() {
880        Class<? extends RawComparator> theClass = getClass(
881          JobContext.KEY_COMPARATOR, null, RawComparator.class);
882        if (theClass != null)
883          return ReflectionUtils.newInstance(theClass, this);
884        return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
885      }
886    
887      /**
888       * Set the {@link RawComparator} comparator used to compare keys.
889       * 
890       * @param theClass the {@link RawComparator} comparator used to 
891       *                 compare keys.
892       * @see #setOutputValueGroupingComparator(Class)                 
893       */
894      public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
895        setClass(JobContext.KEY_COMPARATOR,
896                 theClass, RawComparator.class);
897      }
898    
899      /**
900       * Set the {@link KeyFieldBasedComparator} options used to compare keys.
901       * 
902       * @param keySpec the key specification of the form -k pos1[,pos2], where,
903       *  pos is of the form f[.c][opts], where f is the number
904       *  of the key field to use, and c is the number of the first character from
905       *  the beginning of the field. Fields and character posns are numbered 
906       *  starting with 1; a character position of zero in pos2 indicates the
907       *  field's last character. If '.c' is omitted from pos1, it defaults to 1
908       *  (the beginning of the field); if omitted from pos2, it defaults to 0 
909       *  (the end of the field). opts are ordering options. The supported options
910       *  are:
911       *    -n, (Sort numerically)
912       *    -r, (Reverse the result of comparison)                 
913       */
914      public void setKeyFieldComparatorOptions(String keySpec) {
915        setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
916        set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
917      }
918      
919      /**
920       * Get the {@link KeyFieldBasedComparator} options
921       */
922      public String getKeyFieldComparatorOption() {
923        return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
924      }
925    
926      /**
927       * Set the {@link KeyFieldBasedPartitioner} options used for 
928       * {@link Partitioner}
929       * 
930       * @param keySpec the key specification of the form -k pos1[,pos2], where,
931       *  pos is of the form f[.c][opts], where f is the number
932       *  of the key field to use, and c is the number of the first character from
933       *  the beginning of the field. Fields and character posns are numbered 
934       *  starting with 1; a character position of zero in pos2 indicates the
935       *  field's last character. If '.c' is omitted from pos1, it defaults to 1
936       *  (the beginning of the field); if omitted from pos2, it defaults to 0 
937       *  (the end of the field).
938       */
939      public void setKeyFieldPartitionerOptions(String keySpec) {
940        setPartitionerClass(KeyFieldBasedPartitioner.class);
941        set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
942      }
943      
944      /**
945       * Get the {@link KeyFieldBasedPartitioner} options
946       */
947      public String getKeyFieldPartitionerOption() {
948        return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
949      }
950    
951      /**
952       * Get the user defined {@link WritableComparable} comparator for
953       * grouping keys of inputs to the combiner.
954       *
955       * @return comparator set by the user for grouping values.
956       * @see #setCombinerKeyGroupingComparator(Class) for details.
957       */
958      public RawComparator getCombinerKeyGroupingComparator() {
959        Class<? extends RawComparator> theClass = getClass(
960            JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
961        if (theClass == null) {
962          return getOutputKeyComparator();
963        }
964    
965        return ReflectionUtils.newInstance(theClass, this);
966      }
967    
968      /** 
969       * Get the user defined {@link WritableComparable} comparator for 
970       * grouping keys of inputs to the reduce.
971       * 
972       * @return comparator set by the user for grouping values.
973       * @see #setOutputValueGroupingComparator(Class) for details.
974       */
975      public RawComparator getOutputValueGroupingComparator() {
976        Class<? extends RawComparator> theClass = getClass(
977          JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
978        if (theClass == null) {
979          return getOutputKeyComparator();
980        }
981        
982        return ReflectionUtils.newInstance(theClass, this);
983      }
984    
985      /**
986       * Set the user defined {@link RawComparator} comparator for
987       * grouping keys in the input to the combiner.
988       * <p/>
989       * <p>This comparator should be provided if the equivalence rules for keys
990       * for sorting the intermediates are different from those for grouping keys
991       * before each call to
992       * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
993       * <p/>
994       * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
995       * in a single call to the reduce function if K1 and K2 compare as equal.</p>
996       * <p/>
997       * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
998       * how keys are sorted, this can be used in conjunction to simulate
999       * <i>secondary sort on values</i>.</p>
1000       * <p/>
1001       * <p><i>Note</i>: This is not a guarantee of the combiner sort being
1002       * <i>stable</i> in any sense. (In any case, with the order of available
1003       * map-outputs to the combiner being non-deterministic, it wouldn't make
1004       * that much sense.)</p>
1005       *
1006       * @param theClass the comparator class to be used for grouping keys for the
1007       * combiner. It should implement <code>RawComparator</code>.
1008       * @see #setOutputKeyComparatorClass(Class)
1009       */
1010      public void setCombinerKeyGroupingComparator(
1011          Class<? extends RawComparator> theClass) {
1012        setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
1013            theClass, RawComparator.class);
1014      }
1015    
1016      /** 
1017       * Set the user defined {@link RawComparator} comparator for 
1018       * grouping keys in the input to the reduce.
1019       * 
1020       * <p>This comparator should be provided if the equivalence rules for keys
1021       * for sorting the intermediates are different from those for grouping keys
1022       * before each call to 
1023       * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
1024       *  
1025       * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
1026       * in a single call to the reduce function if K1 and K2 compare as equal.</p>
1027       * 
1028       * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 
1029       * how keys are sorted, this can be used in conjunction to simulate 
1030       * <i>secondary sort on values</i>.</p>
1031       *  
1032       * <p><i>Note</i>: This is not a guarantee of the reduce sort being 
1033       * <i>stable</i> in any sense. (In any case, with the order of available 
1034       * map-outputs to the reduce being non-deterministic, it wouldn't make 
1035       * that much sense.)</p>
1036       * 
1037       * @param theClass the comparator class to be used for grouping keys. 
1038       *                 It should implement <code>RawComparator</code>.
1039       * @see #setOutputKeyComparatorClass(Class)
1040       * @see #setCombinerKeyGroupingComparator(Class)
1041       */
1042      public void setOutputValueGroupingComparator(
1043          Class<? extends RawComparator> theClass) {
1044        setClass(JobContext.GROUP_COMPARATOR_CLASS,
1045                 theClass, RawComparator.class);
1046      }
1047    
1048      /**
1049       * Should the framework use the new context-object code for running
1050       * the mapper?
1051       * @return true, if the new api should be used
1052       */
1053      public boolean getUseNewMapper() {
1054        return getBoolean("mapred.mapper.new-api", false);
1055      }
1056      /**
1057       * Set whether the framework should use the new api for the mapper.
1058       * This is the default for jobs submitted with the new Job api.
1059       * @param flag true, if the new api should be used
1060       */
1061      public void setUseNewMapper(boolean flag) {
1062        setBoolean("mapred.mapper.new-api", flag);
1063      }
1064    
1065      /**
1066       * Should the framework use the new context-object code for running
1067       * the reducer?
1068       * @return true, if the new api should be used
1069       */
1070      public boolean getUseNewReducer() {
1071        return getBoolean("mapred.reducer.new-api", false);
1072      }
1073      /**
1074       * Set whether the framework should use the new api for the reducer. 
1075       * This is the default for jobs submitted with the new Job api.
1076       * @param flag true, if the new api should be used
1077       */
1078      public void setUseNewReducer(boolean flag) {
1079        setBoolean("mapred.reducer.new-api", flag);
1080      }
1081    
1082      /**
1083       * Get the value class for job outputs.
1084       * 
1085       * @return the value class for job outputs.
1086       */
1087      public Class<?> getOutputValueClass() {
1088        return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
1089      }
1090      
1091      /**
1092       * Set the value class for job outputs.
1093       * 
1094       * @param theClass the value class for job outputs.
1095       */
1096      public void setOutputValueClass(Class<?> theClass) {
1097        setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
1098      }
1099    
1100      /**
1101       * Get the {@link Mapper} class for the job.
1102       * 
1103       * @return the {@link Mapper} class for the job.
1104       */
1105      public Class<? extends Mapper> getMapperClass() {
1106        return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
1107      }
1108      
1109      /**
1110       * Set the {@link Mapper} class for the job.
1111       * 
1112       * @param theClass the {@link Mapper} class for the job.
1113       */
1114      public void setMapperClass(Class<? extends Mapper> theClass) {
1115        setClass("mapred.mapper.class", theClass, Mapper.class);
1116      }
1117    
1118      /**
1119       * Get the {@link MapRunnable} class for the job.
1120       * 
1121       * @return the {@link MapRunnable} class for the job.
1122       */
1123      public Class<? extends MapRunnable> getMapRunnerClass() {
1124        return getClass("mapred.map.runner.class",
1125                        MapRunner.class, MapRunnable.class);
1126      }
1127      
1128      /**
1129       * Expert: Set the {@link MapRunnable} class for the job.
1130       * 
1131       * Typically used to exert greater control on {@link Mapper}s.
1132       * 
1133       * @param theClass the {@link MapRunnable} class for the job.
1134       */
1135      public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1136        setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1137      }
1138    
1139      /**
1140       * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 
1141       * to be sent to the {@link Reducer}s.
1142       * 
1143       * @return the {@link Partitioner} used to partition map-outputs.
1144       */
1145      public Class<? extends Partitioner> getPartitionerClass() {
1146        return getClass("mapred.partitioner.class",
1147                        HashPartitioner.class, Partitioner.class);
1148      }
1149      
1150      /**
1151       * Set the {@link Partitioner} class used to partition 
1152       * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1153       * 
1154       * @param theClass the {@link Partitioner} used to partition map-outputs.
1155       */
1156      public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1157        setClass("mapred.partitioner.class", theClass, Partitioner.class);
1158      }
1159    
1160      /**
1161       * Get the {@link Reducer} class for the job.
1162       * 
1163       * @return the {@link Reducer} class for the job.
1164       */
1165      public Class<? extends Reducer> getReducerClass() {
1166        return getClass("mapred.reducer.class",
1167                        IdentityReducer.class, Reducer.class);
1168      }
1169      
1170      /**
1171       * Set the {@link Reducer} class for the job.
1172       * 
1173       * @param theClass the {@link Reducer} class for the job.
1174       */
1175      public void setReducerClass(Class<? extends Reducer> theClass) {
1176        setClass("mapred.reducer.class", theClass, Reducer.class);
1177      }
1178    
1179      /**
1180       * Get the user-defined <i>combiner</i> class used to combine map-outputs 
1181       * before being sent to the reducers. Typically the combiner is same as the
1182       * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1183       * 
1184       * @return the user-defined combiner class used to combine map-outputs.
1185       */
1186      public Class<? extends Reducer> getCombinerClass() {
1187        return getClass("mapred.combiner.class", null, Reducer.class);
1188      }
1189    
1190      /**
1191       * Set the user-defined <i>combiner</i> class used to combine map-outputs 
1192       * before being sent to the reducers. 
1193       * 
1194       * <p>The combiner is an application-specified aggregation operation, which
1195       * can help cut down the amount of data transferred between the 
1196       * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1197       * 
1198       * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1199       * the mapper and reducer tasks. In general, the combiner is called as the
1200       * sort/merge result is written to disk. The combiner must:
1201       * <ul>
1202       *   <li> be side-effect free</li>
1203       *   <li> have the same input and output key types and the same input and 
1204       *        output value types</li>
1205       * </ul></p>
1206       * 
1207       * <p>Typically the combiner is same as the <code>Reducer</code> for the  
1208       * job i.e. {@link #setReducerClass(Class)}.</p>
1209       * 
1210       * @param theClass the user-defined combiner class used to combine 
1211       *                 map-outputs.
1212       */
1213      public void setCombinerClass(Class<? extends Reducer> theClass) {
1214        setClass("mapred.combiner.class", theClass, Reducer.class);
1215      }
1216      
1217      /**
1218       * Should speculative execution be used for this job? 
1219       * Defaults to <code>true</code>.
1220       * 
1221       * @return <code>true</code> if speculative execution be used for this job,
1222       *         <code>false</code> otherwise.
1223       */
1224      public boolean getSpeculativeExecution() { 
1225        return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1226      }
1227      
1228      /**
1229       * Turn speculative execution on or off for this job. 
1230       * 
1231       * @param speculativeExecution <code>true</code> if speculative execution 
1232       *                             should be turned on, else <code>false</code>.
1233       */
1234      public void setSpeculativeExecution(boolean speculativeExecution) {
1235        setMapSpeculativeExecution(speculativeExecution);
1236        setReduceSpeculativeExecution(speculativeExecution);
1237      }
1238    
1239      /**
1240       * Should speculative execution be used for this job for map tasks? 
1241       * Defaults to <code>true</code>.
1242       * 
1243       * @return <code>true</code> if speculative execution be 
1244       *                           used for this job for map tasks,
1245       *         <code>false</code> otherwise.
1246       */
1247      public boolean getMapSpeculativeExecution() { 
1248        return getBoolean(JobContext.MAP_SPECULATIVE, true);
1249      }
1250      
1251      /**
1252       * Turn speculative execution on or off for this job for map tasks. 
1253       * 
1254       * @param speculativeExecution <code>true</code> if speculative execution 
1255       *                             should be turned on for map tasks,
1256       *                             else <code>false</code>.
1257       */
1258      public void setMapSpeculativeExecution(boolean speculativeExecution) {
1259        setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1260      }
1261    
1262      /**
1263       * Should speculative execution be used for this job for reduce tasks? 
1264       * Defaults to <code>true</code>.
1265       * 
1266       * @return <code>true</code> if speculative execution be used 
1267       *                           for reduce tasks for this job,
1268       *         <code>false</code> otherwise.
1269       */
1270      public boolean getReduceSpeculativeExecution() { 
1271        return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1272      }
1273      
1274      /**
1275       * Turn speculative execution on or off for this job for reduce tasks. 
1276       * 
1277       * @param speculativeExecution <code>true</code> if speculative execution 
1278       *                             should be turned on for reduce tasks,
1279       *                             else <code>false</code>.
1280       */
1281      public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1282        setBoolean(JobContext.REDUCE_SPECULATIVE, 
1283                   speculativeExecution);
1284      }
1285    
1286      /**
1287       * Get configured the number of reduce tasks for this job.
1288       * Defaults to <code>1</code>.
1289       * 
1290       * @return the number of reduce tasks for this job.
1291       */
1292      public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1293      
1294      /**
1295       * Set the number of map tasks for this job.
1296       * 
1297       * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 
1298       * number of spawned map tasks depends on the number of {@link InputSplit}s 
1299       * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1300       *  
1301       * A custom {@link InputFormat} is typically used to accurately control 
1302       * the number of map tasks for the job.</p>
1303       * 
1304       * <h4 id="NoOfMaps">How many maps?</h4>
1305       * 
1306       * <p>The number of maps is usually driven by the total size of the inputs 
1307       * i.e. total number of blocks of the input files.</p>
1308       *  
1309       * <p>The right level of parallelism for maps seems to be around 10-100 maps 
1310       * per-node, although it has been set up to 300 or so for very cpu-light map 
1311       * tasks. Task setup takes awhile, so it is best if the maps take at least a 
1312       * minute to execute.</p>
1313       * 
1314       * <p>The default behavior of file-based {@link InputFormat}s is to split the 
1315       * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
1316       * bytes, of input files. However, the {@link FileSystem} blocksize of the 
1317       * input files is treated as an upper bound for input splits. A lower bound 
1318       * on the split size can be set via 
1319       * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
1320       * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1321       *  
1322       * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 
1323       * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 
1324       * used to set it even higher.</p>
1325       * 
1326       * @param n the number of map tasks for this job.
1327       * @see InputFormat#getSplits(JobConf, int)
1328       * @see FileInputFormat
1329       * @see FileSystem#getDefaultBlockSize()
1330       * @see FileStatus#getBlockSize()
1331       */
1332      public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1333    
1334      /**
1335       * Get configured the number of reduce tasks for this job. Defaults to 
1336       * <code>1</code>.
1337       * 
1338       * @return the number of reduce tasks for this job.
1339       */
1340      public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1341      
1342      /**
1343       * Set the requisite number of reduce tasks for this job.
1344       * 
1345       * <h4 id="NoOfReduces">How many reduces?</h4>
1346       * 
1347       * <p>The right number of reduces seems to be <code>0.95</code> or 
1348       * <code>1.75</code> multiplied by (&lt;<i>no. of nodes</i>&gt; * 
1349       * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
1350       * mapreduce.tasktracker.reduce.tasks.maximum</a>).
1351       * </p>
1352       * 
1353       * <p>With <code>0.95</code> all of the reduces can launch immediately and 
1354       * start transfering map outputs as the maps finish. With <code>1.75</code> 
1355       * the faster nodes will finish their first round of reduces and launch a 
1356       * second wave of reduces doing a much better job of load balancing.</p>
1357       * 
1358       * <p>Increasing the number of reduces increases the framework overhead, but 
1359       * increases load balancing and lowers the cost of failures.</p>
1360       * 
1361       * <p>The scaling factors above are slightly less than whole numbers to 
1362       * reserve a few reduce slots in the framework for speculative-tasks, failures
1363       * etc.</p> 
1364       *
1365       * <h4 id="ReducerNone">Reducer NONE</h4>
1366       * 
1367       * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1368       * 
1369       * <p>In this case the output of the map-tasks directly go to distributed 
1370       * file-system, to the path set by 
1371       * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 
1372       * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1373       * 
1374       * @param n the number of reduce tasks for this job.
1375       */
1376      public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1377      
1378      /** 
1379       * Get the configured number of maximum attempts that will be made to run a
1380       * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1381       * property. If this property is not already set, the default is 4 attempts.
1382       *  
1383       * @return the max number of attempts per map task.
1384       */
1385      public int getMaxMapAttempts() {
1386        return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1387      }
1388      
1389      /** 
1390       * Expert: Set the number of maximum attempts that will be made to run a
1391       * map task.
1392       * 
1393       * @param n the number of attempts per map task.
1394       */
1395      public void setMaxMapAttempts(int n) {
1396        setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1397      }
1398    
1399      /** 
1400       * Get the configured number of maximum attempts  that will be made to run a
1401       * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1402       * property. If this property is not already set, the default is 4 attempts.
1403       * 
1404       * @return the max number of attempts per reduce task.
1405       */
1406      public int getMaxReduceAttempts() {
1407        return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1408      }
1409      /** 
1410       * Expert: Set the number of maximum attempts that will be made to run a
1411       * reduce task.
1412       * 
1413       * @param n the number of attempts per reduce task.
1414       */
1415      public void setMaxReduceAttempts(int n) {
1416        setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1417      }
1418      
1419      /**
1420       * Get the user-specified job name. This is only used to identify the 
1421       * job to the user.
1422       * 
1423       * @return the job's name, defaulting to "".
1424       */
1425      public String getJobName() {
1426        return get(JobContext.JOB_NAME, "");
1427      }
1428      
1429      /**
1430       * Set the user-specified job name.
1431       * 
1432       * @param name the job's new name.
1433       */
1434      public void setJobName(String name) {
1435        set(JobContext.JOB_NAME, name);
1436      }
1437      
1438      /**
1439       * Get the user-specified session identifier. The default is the empty string.
1440       *
1441       * The session identifier is used to tag metric data that is reported to some
1442       * performance metrics system via the org.apache.hadoop.metrics API.  The 
1443       * session identifier is intended, in particular, for use by Hadoop-On-Demand 
1444       * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 
1445       * HOD will set the session identifier by modifying the mapred-site.xml file 
1446       * before starting the cluster.
1447       *
1448       * When not running under HOD, this identifer is expected to remain set to 
1449       * the empty string.
1450       *
1451       * @return the session identifier, defaulting to "".
1452       */
1453      @Deprecated
1454      public String getSessionId() {
1455          return get("session.id", "");
1456      }
1457      
1458      /**
1459       * Set the user-specified session identifier.  
1460       *
1461       * @param sessionId the new session id.
1462       */
1463      @Deprecated
1464      public void setSessionId(String sessionId) {
1465          set("session.id", sessionId);
1466      }
1467        
1468      /**
1469       * Set the maximum no. of failures of a given job per tasktracker.
1470       * If the no. of task failures exceeds <code>noFailures</code>, the 
1471       * tasktracker is <i>blacklisted</i> for this job. 
1472       * 
1473       * @param noFailures maximum no. of failures of a given job per tasktracker.
1474       */
1475      public void setMaxTaskFailuresPerTracker(int noFailures) {
1476        setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1477      }
1478      
1479      /**
1480       * Expert: Get the maximum no. of failures of a given job per tasktracker.
1481       * If the no. of task failures exceeds this, the tasktracker is
1482       * <i>blacklisted</i> for this job. 
1483       * 
1484       * @return the maximum no. of failures of a given job per tasktracker.
1485       */
1486      public int getMaxTaskFailuresPerTracker() {
1487        return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
1488      }
1489    
1490      /**
1491       * Get the maximum percentage of map tasks that can fail without 
1492       * the job being aborted. 
1493       * 
1494       * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 
1495       * attempts before being declared as <i>failed</i>.
1496       *  
1497       * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1498       * the job being declared as {@link JobStatus#FAILED}.
1499       * 
1500       * @return the maximum percentage of map tasks that can fail without
1501       *         the job being aborted.
1502       */
1503      public int getMaxMapTaskFailuresPercent() {
1504        return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1505      }
1506    
1507      /**
1508       * Expert: Set the maximum percentage of map tasks that can fail without the
1509       * job being aborted. 
1510       * 
1511       * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 
1512       * before being declared as <i>failed</i>.
1513       * 
1514       * @param percent the maximum percentage of map tasks that can fail without 
1515       *                the job being aborted.
1516       */
1517      public void setMaxMapTaskFailuresPercent(int percent) {
1518        setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1519      }
1520      
1521      /**
1522       * Get the maximum percentage of reduce tasks that can fail without 
1523       * the job being aborted. 
1524       * 
1525       * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1526       * attempts before being declared as <i>failed</i>.
1527       * 
1528       * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 
1529       * in the job being declared as {@link JobStatus#FAILED}.
1530       * 
1531       * @return the maximum percentage of reduce tasks that can fail without
1532       *         the job being aborted.
1533       */
1534      public int getMaxReduceTaskFailuresPercent() {
1535        return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1536      }
1537      
1538      /**
1539       * Set the maximum percentage of reduce tasks that can fail without the job
1540       * being aborted.
1541       * 
1542       * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1543       * attempts before being declared as <i>failed</i>.
1544       * 
1545       * @param percent the maximum percentage of reduce tasks that can fail without 
1546       *                the job being aborted.
1547       */
1548      public void setMaxReduceTaskFailuresPercent(int percent) {
1549        setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1550      }
1551      
1552      /**
1553       * Set {@link JobPriority} for this job.
1554       * 
1555       * @param prio the {@link JobPriority} for this job.
1556       */
1557      public void setJobPriority(JobPriority prio) {
1558        set(JobContext.PRIORITY, prio.toString());
1559      }
1560      
1561      /**
1562       * Get the {@link JobPriority} for this job.
1563       * 
1564       * @return the {@link JobPriority} for this job.
1565       */
1566      public JobPriority getJobPriority() {
1567        String prio = get(JobContext.PRIORITY);
1568        if(prio == null) {
1569          return JobPriority.NORMAL;
1570        }
1571        
1572        return JobPriority.valueOf(prio);
1573      }
1574    
1575      /**
1576       * Set JobSubmitHostName for this job.
1577       * 
1578       * @param hostname the JobSubmitHostName for this job.
1579       */
1580      void setJobSubmitHostName(String hostname) {
1581        set(MRJobConfig.JOB_SUBMITHOST, hostname);
1582      }
1583      
1584      /**
1585       * Get the  JobSubmitHostName for this job.
1586       * 
1587       * @return the JobSubmitHostName for this job.
1588       */
1589      String getJobSubmitHostName() {
1590        String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1591        
1592        return hostname;
1593      }
1594    
1595      /**
1596       * Set JobSubmitHostAddress for this job.
1597       * 
1598       * @param hostadd the JobSubmitHostAddress for this job.
1599       */
1600      void setJobSubmitHostAddress(String hostadd) {
1601        set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1602      }
1603      
1604      /**
1605       * Get JobSubmitHostAddress for this job.
1606       * 
1607       * @return  JobSubmitHostAddress for this job.
1608       */
1609      String getJobSubmitHostAddress() {
1610        String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1611        
1612        return hostadd;
1613      }
1614    
1615      /**
1616       * Get whether the task profiling is enabled.
1617       * @return true if some tasks will be profiled
1618       */
1619      public boolean getProfileEnabled() {
1620        return getBoolean(JobContext.TASK_PROFILE, false);
1621      }
1622    
1623      /**
1624       * Set whether the system should collect profiler information for some of 
1625       * the tasks in this job? The information is stored in the user log 
1626       * directory.
1627       * @param newValue true means it should be gathered
1628       */
1629      public void setProfileEnabled(boolean newValue) {
1630        setBoolean(JobContext.TASK_PROFILE, newValue);
1631      }
1632    
1633      /**
1634       * Get the profiler configuration arguments.
1635       *
1636       * The default value for this property is
1637       * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1638       * 
1639       * @return the parameters to pass to the task child to configure profiling
1640       */
1641      public String getProfileParams() {
1642        return get(JobContext.TASK_PROFILE_PARAMS,
1643                   "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," +
1644                     "verbose=n,file=%s");
1645      }
1646    
1647      /**
1648       * Set the profiler configuration arguments. If the string contains a '%s' it
1649       * will be replaced with the name of the profiling output file when the task
1650       * runs.
1651       *
1652       * This value is passed to the task child JVM on the command line.
1653       *
1654       * @param value the configuration string
1655       */
1656      public void setProfileParams(String value) {
1657        set(JobContext.TASK_PROFILE_PARAMS, value);
1658      }
1659    
1660      /**
1661       * Get the range of maps or reduces to profile.
1662       * @param isMap is the task a map?
1663       * @return the task ranges
1664       */
1665      public IntegerRanges getProfileTaskRange(boolean isMap) {
1666        return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 
1667                           JobContext.NUM_REDUCE_PROFILES), "0-2");
1668      }
1669    
1670      /**
1671       * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1672       * must also be called.
1673       * @param newValue a set of integer ranges of the map ids
1674       */
1675      public void setProfileTaskRange(boolean isMap, String newValue) {
1676        // parse the value to make sure it is legal
1677          new Configuration.IntegerRanges(newValue);
1678        set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 
1679              newValue);
1680      }
1681    
1682      /**
1683       * Set the debug script to run when the map tasks fail.
1684       * 
1685       * <p>The debug script can aid debugging of failed map tasks. The script is 
1686       * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1687       * 
1688       * <p>The debug command, run on the node where the map failed, is:</p>
1689       * <p><pre><blockquote> 
1690       * $script $stdout $stderr $syslog $jobconf.
1691       * </blockquote></pre></p>
1692       * 
1693       * <p> The script file is distributed through {@link DistributedCache} 
1694       * APIs. The script needs to be symlinked. </p>
1695       * 
1696       * <p>Here is an example on how to submit a script 
1697       * <p><blockquote><pre>
1698       * job.setMapDebugScript("./myscript");
1699       * DistributedCache.createSymlink(job);
1700       * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1701       * </pre></blockquote></p>
1702       * 
1703       * @param mDbgScript the script name
1704       */
1705      public void  setMapDebugScript(String mDbgScript) {
1706        set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1707      }
1708      
1709      /**
1710       * Get the map task's debug script.
1711       * 
1712       * @return the debug Script for the mapred job for failed map tasks.
1713       * @see #setMapDebugScript(String)
1714       */
1715      public String getMapDebugScript() {
1716        return get(JobContext.MAP_DEBUG_SCRIPT);
1717      }
1718      
1719      /**
1720       * Set the debug script to run when the reduce tasks fail.
1721       * 
1722       * <p>The debug script can aid debugging of failed reduce tasks. The script
1723       * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1724       * 
1725       * <p>The debug command, run on the node where the map failed, is:</p>
1726       * <p><pre><blockquote> 
1727       * $script $stdout $stderr $syslog $jobconf.
1728       * </blockquote></pre></p>
1729       * 
1730       * <p> The script file is distributed through {@link DistributedCache} 
1731       * APIs. The script file needs to be symlinked </p>
1732       * 
1733       * <p>Here is an example on how to submit a script 
1734       * <p><blockquote><pre>
1735       * job.setReduceDebugScript("./myscript");
1736       * DistributedCache.createSymlink(job);
1737       * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1738       * </pre></blockquote></p>
1739       * 
1740       * @param rDbgScript the script name
1741       */
1742      public void  setReduceDebugScript(String rDbgScript) {
1743        set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1744      }
1745      
1746      /**
1747       * Get the reduce task's debug Script
1748       * 
1749       * @return the debug script for the mapred job for failed reduce tasks.
1750       * @see #setReduceDebugScript(String)
1751       */
1752      public String getReduceDebugScript() {
1753        return get(JobContext.REDUCE_DEBUG_SCRIPT);
1754      }
1755    
1756      /**
1757       * Get the uri to be invoked in-order to send a notification after the job 
1758       * has completed (success/failure). 
1759       * 
1760       * @return the job end notification uri, <code>null</code> if it hasn't
1761       *         been set.
1762       * @see #setJobEndNotificationURI(String)
1763       */
1764      public String getJobEndNotificationURI() {
1765        return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1766      }
1767    
1768      /**
1769       * Set the uri to be invoked in-order to send a notification after the job
1770       * has completed (success/failure).
1771       * 
1772       * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 
1773       * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 
1774       * identifier and completion-status respectively.</p>
1775       * 
1776       * <p>This is typically used by application-writers to implement chaining of 
1777       * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1778       * 
1779       * @param uri the job end notification uri
1780       * @see JobStatus
1781       * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#
1782       *       JobCompletionAndChaining">Job Completion and Chaining</a>
1783       */
1784      public void setJobEndNotificationURI(String uri) {
1785        set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1786      }
1787    
1788      /**
1789       * Get job-specific shared directory for use as scratch space
1790       * 
1791       * <p>
1792       * When a job starts, a shared directory is created at location
1793       * <code>
1794       * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1795       * This directory is exposed to the users through 
1796       * <code>mapreduce.job.local.dir </code>.
1797       * So, the tasks can use this space 
1798       * as scratch space and share files among them. </p>
1799       * This value is available as System property also.
1800       * 
1801       * @return The localized job specific shared directory
1802       */
1803      public String getJobLocalDir() {
1804        return get(JobContext.JOB_LOCAL_DIR);
1805      }
1806    
1807      /**
1808       * Get memory required to run a map task of the job, in MB.
1809       * 
1810       * If a value is specified in the configuration, it is returned.
1811       * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1812       * <p/>
1813       * For backward compatibility, if the job configuration sets the
1814       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1815       * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1816       * after converting it from bytes to MB.
1817       * @return memory required to run a map task of the job, in MB,
1818       *          or {@link #DISABLED_MEMORY_LIMIT} if unset.
1819       */
1820      public long getMemoryForMapTask() {
1821        long value = getDeprecatedMemoryValue();
1822        if (value == DISABLED_MEMORY_LIMIT) {
1823          value = normalizeMemoryConfigValue(
1824                    getLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY,
1825                              DISABLED_MEMORY_LIMIT));
1826        }
1827        // In case that M/R 1.x applications use the old property name
1828        if (value == DISABLED_MEMORY_LIMIT) {
1829          value = normalizeMemoryConfigValue(
1830                    getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
1831                              DISABLED_MEMORY_LIMIT));
1832        }
1833        return value;
1834      }
1835    
1836      public void setMemoryForMapTask(long mem) {
1837        setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1838        // In case that M/R 1.x applications use the old property name
1839        setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1840      }
1841    
1842      /**
1843       * Get memory required to run a reduce task of the job, in MB.
1844       * 
1845       * If a value is specified in the configuration, it is returned.
1846       * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1847       * <p/>
1848       * For backward compatibility, if the job configuration sets the
1849       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1850       * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1851       * after converting it from bytes to MB.
1852       * @return memory required to run a reduce task of the job, in MB,
1853       *          or {@link #DISABLED_MEMORY_LIMIT} if unset.
1854       */
1855      public long getMemoryForReduceTask() {
1856        long value = getDeprecatedMemoryValue();
1857        if (value == DISABLED_MEMORY_LIMIT) {
1858          value = normalizeMemoryConfigValue(
1859                    getLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY,
1860                            DISABLED_MEMORY_LIMIT));
1861        }
1862        // In case that M/R 1.x applications use the old property name
1863        if (value == DISABLED_MEMORY_LIMIT) {
1864          value = normalizeMemoryConfigValue(
1865                    getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
1866                            DISABLED_MEMORY_LIMIT));
1867        }
1868        return value;
1869      }
1870      
1871      // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1872      // converted into MBs.
1873      // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1874      // value.
1875      private long getDeprecatedMemoryValue() {
1876        long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 
1877            DISABLED_MEMORY_LIMIT);
1878        oldValue = normalizeMemoryConfigValue(oldValue);
1879        if (oldValue != DISABLED_MEMORY_LIMIT) {
1880          oldValue /= (1024*1024);
1881        }
1882        return oldValue;
1883      }
1884    
1885      public void setMemoryForReduceTask(long mem) {
1886        setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1887        // In case that M/R 1.x applications use the old property name
1888        setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1889      }
1890    
1891      /**
1892       * Return the name of the queue to which this job is submitted.
1893       * Defaults to 'default'.
1894       * 
1895       * @return name of the queue
1896       */
1897      public String getQueueName() {
1898        return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1899      }
1900      
1901      /**
1902       * Set the name of the queue to which this job should be submitted.
1903       * 
1904       * @param queueName Name of the queue
1905       */
1906      public void setQueueName(String queueName) {
1907        set(JobContext.QUEUE_NAME, queueName);
1908      }
1909      
1910      /**
1911       * Normalize the negative values in configuration
1912       * 
1913       * @param val
1914       * @return normalized value
1915       */
1916      public static long normalizeMemoryConfigValue(long val) {
1917        if (val < 0) {
1918          val = DISABLED_MEMORY_LIMIT;
1919        }
1920        return val;
1921      }
1922    
1923      /**
1924       * Compute the number of slots required to run a single map task-attempt
1925       * of this job.
1926       * @param slotSizePerMap cluster-wide value of the amount of memory required
1927       *                       to run a map-task
1928       * @return the number of slots required to run a single map task-attempt
1929       *          1 if memory parameters are disabled.
1930       */
1931      int computeNumSlotsPerMap(long slotSizePerMap) {
1932        if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) ||
1933            (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) {
1934          return 1;
1935        }
1936        return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap));
1937      }
1938      
1939      /**
1940       * Compute the number of slots required to run a single reduce task-attempt
1941       * of this job.
1942       * @param slotSizePerReduce cluster-wide value of the amount of memory 
1943       *                          required to run a reduce-task
1944       * @return the number of slots required to run a single reduce task-attempt
1945       *          1 if memory parameters are disabled
1946       */
1947      int computeNumSlotsPerReduce(long slotSizePerReduce) {
1948        if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) ||
1949            (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) {
1950          return 1;
1951        }
1952        return 
1953        (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
1954      }
1955    
1956      /** 
1957       * Find a jar that contains a class of the same name, if any.
1958       * It will return a jar file, even if that is not the first thing
1959       * on the class path that has a class with the same name.
1960       * 
1961       * @param my_class the class to find.
1962       * @return a jar file that contains the class, or null.
1963       * @throws IOException
1964       */
1965      public static String findContainingJar(Class my_class) {
1966        return ClassUtil.findContainingJar(my_class);
1967      }
1968    
1969      /**
1970       * Get the memory required to run a task of this job, in bytes. See
1971       * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1972       * <p/>
1973       * This method is deprecated. Now, different memory limits can be
1974       * set for map and reduce tasks of a job, in MB. 
1975       * <p/>
1976       * For backward compatibility, if the job configuration sets the
1977       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1978       * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned. 
1979       * Otherwise, this method will return the larger of the values returned by 
1980       * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
1981       * after converting them into bytes.
1982       *
1983       * @return Memory required to run a task of this job, in bytes,
1984       *          or {@link #DISABLED_MEMORY_LIMIT}, if unset.
1985       * @see #setMaxVirtualMemoryForTask(long)
1986       * @deprecated Use {@link #getMemoryForMapTask()} and
1987       *             {@link #getMemoryForReduceTask()}
1988       */
1989      @Deprecated
1990      public long getMaxVirtualMemoryForTask() {
1991        LOG.warn(
1992          "getMaxVirtualMemoryForTask() is deprecated. " +
1993          "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
1994    
1995        long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
1996        value = normalizeMemoryConfigValue(value);
1997        if (value == DISABLED_MEMORY_LIMIT) {
1998          value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
1999          value = normalizeMemoryConfigValue(value);
2000          if (value != DISABLED_MEMORY_LIMIT) {
2001            value *= 1024*1024;
2002          }
2003        }
2004        return value;
2005      }
2006    
2007      /**
2008       * Set the maximum amount of memory any task of this job can use. See
2009       * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
2010       * <p/>
2011       * mapred.task.maxvmem is split into
2012       * mapreduce.map.memory.mb
2013       * and mapreduce.map.memory.mb,mapred
2014       * each of the new key are set
2015       * as mapred.task.maxvmem / 1024
2016       * as new values are in MB
2017       *
2018       * @param vmem Maximum amount of virtual memory in bytes any task of this job
2019       *             can use.
2020       * @see #getMaxVirtualMemoryForTask()
2021       * @deprecated
2022       *  Use {@link #setMemoryForMapTask(long mem)}  and
2023       *  Use {@link #setMemoryForReduceTask(long mem)}
2024       */
2025      @Deprecated
2026      public void setMaxVirtualMemoryForTask(long vmem) {
2027        LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
2028          "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
2029        if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) {
2030          setMemoryForMapTask(DISABLED_MEMORY_LIMIT);
2031          setMemoryForReduceTask(DISABLED_MEMORY_LIMIT);
2032        }
2033    
2034        if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
2035          setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
2036          setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
2037        }else{
2038          this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
2039        }
2040      }
2041    
2042      /**
2043       * @deprecated this variable is deprecated and nolonger in use.
2044       */
2045      @Deprecated
2046      public long getMaxPhysicalMemoryForTask() {
2047        LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
2048                  + " Refer to the APIs getMemoryForMapTask() and"
2049                  + " getMemoryForReduceTask() for details.");
2050        return -1;
2051      }
2052    
2053      /*
2054       * @deprecated this
2055       */
2056      @Deprecated
2057      public void setMaxPhysicalMemoryForTask(long mem) {
2058        LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
2059            + " The value set is ignored. Refer to "
2060            + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
2061      }
2062    
2063      static String deprecatedString(String key) {
2064        return "The variable " + key + " is no longer used.";
2065      }
2066    
2067      private void checkAndWarnDeprecation() {
2068        if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
2069          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
2070                    + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY
2071                    + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY);
2072        }
2073        if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
2074          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
2075        }
2076        if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
2077          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
2078        }
2079        if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
2080          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
2081        }
2082      }
2083      
2084    
2085    }
2086