001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    
022    import java.io.IOException;
023    import java.util.regex.Pattern;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceAudience.Private;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.fs.FileStatus;
032    import org.apache.hadoop.fs.FileSystem;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.io.LongWritable;
035    import org.apache.hadoop.io.RawComparator;
036    import org.apache.hadoop.io.Text;
037    import org.apache.hadoop.io.WritableComparable;
038    import org.apache.hadoop.io.WritableComparator;
039    import org.apache.hadoop.io.compress.CompressionCodec;
040    import org.apache.hadoop.mapred.lib.HashPartitioner;
041    import org.apache.hadoop.mapred.lib.IdentityMapper;
042    import org.apache.hadoop.mapred.lib.IdentityReducer;
043    import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
044    import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
045    import org.apache.hadoop.mapreduce.MRConfig;
046    import org.apache.hadoop.mapreduce.MRJobConfig;
047    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
048    import org.apache.hadoop.mapreduce.util.ConfigUtil;
049    import org.apache.hadoop.security.Credentials;
050    import org.apache.hadoop.util.ClassUtil;
051    import org.apache.hadoop.util.ReflectionUtils;
052    import org.apache.hadoop.util.Tool;
053    import org.apache.log4j.Level;
054    
055    /** 
056     * A map/reduce job configuration.
057     * 
058     * <p><code>JobConf</code> is the primary interface for a user to describe a 
059     * map-reduce job to the Hadoop framework for execution. The framework tries to
060     * faithfully execute the job as-is described by <code>JobConf</code>, however:
061     * <ol>
062     *   <li>
063     *   Some configuration parameters might have been marked as 
064     *   <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
065     *   final</a> by administrators and hence cannot be altered.
066     *   </li>
067     *   <li>
068     *   While some job parameters are straight-forward to set 
069     *   (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly
070     *   with the rest of the framework and/or job-configuration and is relatively
071     *   more complex for the user to control finely
072     *   (e.g. {@link #setNumMapTasks(int)}).
073     *   </li>
074     * </ol></p>
075     * 
076     * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 
077     * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 
078     * {@link OutputFormat} implementations to be used etc.
079     *
080     * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 
081     * of the job such as <code>Comparator</code>s to be used, files to be put in  
082     * the {@link DistributedCache}, whether or not intermediate and/or job outputs 
083     * are to be compressed (and how), debugability via user-provided scripts 
084     * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
085     * for doing post-processing on task logs, task's stdout, stderr, syslog. 
086     * and etc.</p>
087     * 
088     * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
089     * <p><blockquote><pre>
090     *     // Create a new JobConf
091     *     JobConf job = new JobConf(new Configuration(), MyJob.class);
092     *     
093     *     // Specify various job-specific parameters     
094     *     job.setJobName("myjob");
095     *     
096     *     FileInputFormat.setInputPaths(job, new Path("in"));
097     *     FileOutputFormat.setOutputPath(job, new Path("out"));
098     *     
099     *     job.setMapperClass(MyJob.MyMapper.class);
100     *     job.setCombinerClass(MyJob.MyReducer.class);
101     *     job.setReducerClass(MyJob.MyReducer.class);
102     *     
103     *     job.setInputFormat(SequenceFileInputFormat.class);
104     *     job.setOutputFormat(SequenceFileOutputFormat.class);
105     * </pre></blockquote></p>
106     * 
107     * @see JobClient
108     * @see ClusterStatus
109     * @see Tool
110     * @see DistributedCache
111     */
112    @InterfaceAudience.Public
113    @InterfaceStability.Stable
114    public class JobConf extends Configuration {
115    
116      private static final Log LOG = LogFactory.getLog(JobConf.class);
117    
118      static{
119        ConfigUtil.loadResources();
120      }
121    
122      /**
123       * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and
124       * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
125       */
126      @Deprecated
127      public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
128        "mapred.task.maxvmem";
129    
130      /**
131       * @deprecated 
132       */
133      @Deprecated
134      public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
135        "mapred.task.limit.maxvmem";
136    
137      /**
138       * @deprecated
139       */
140      @Deprecated
141      public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
142        "mapred.task.default.maxvmem";
143    
144      /**
145       * @deprecated
146       */
147      @Deprecated
148      public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
149        "mapred.task.maxpmem";
150    
151      /**
152       * A value which if set for memory related configuration options,
153       * indicates that the options are turned off.
154       */
155      public static final long DISABLED_MEMORY_LIMIT = -1L;
156    
157      /**
158       * Property name for the configuration property mapreduce.cluster.local.dir
159       */
160      public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
161    
162      /**
163       * Name of the queue to which jobs will be submitted, if no queue
164       * name is mentioned.
165       */
166      public static final String DEFAULT_QUEUE_NAME = "default";
167    
168      static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY =
169          JobContext.MAP_MEMORY_MB;
170    
171      static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY =
172        JobContext.REDUCE_MEMORY_MB;
173    
174      /**
175       * The variable is kept for M/R 1.x applications, while M/R 2.x applications
176       * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY}
177       */
178      @Deprecated
179      public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
180          "mapred.job.map.memory.mb";
181    
182      /**
183       * The variable is kept for M/R 1.x applications, while M/R 2.x applications
184       * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
185       */
186      @Deprecated
187      public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
188          "mapred.job.reduce.memory.mb";
189    
190      /** Pattern for the default unpacking behavior for job jars */
191      public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
192        Pattern.compile("(?:classes/|lib/).*");
193    
194      /**
195       * Configuration key to set the java command line options for the child
196       * map and reduce tasks.
197       * 
198       * Java opts for the task tracker child processes.
199       * The following symbol, if present, will be interpolated: @taskid@. 
200       * It is replaced by current TaskID. Any other occurrences of '@' will go 
201       * unchanged.
202       * For example, to enable verbose gc logging to a file named for the taskid in
203       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
204       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
205       * 
206       * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 
207       * other environment variables to the child processes.
208       * 
209       * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 
210       *                 {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
211       */
212      @Deprecated
213      public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
214      
215      /**
216       * Configuration key to set the java command line options for the map tasks.
217       * 
218       * Java opts for the task tracker child map processes.
219       * The following symbol, if present, will be interpolated: @taskid@. 
220       * It is replaced by current TaskID. Any other occurrences of '@' will go 
221       * unchanged.
222       * For example, to enable verbose gc logging to a file named for the taskid in
223       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
224       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
225       * 
226       * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 
227       * other environment variables to the map processes.
228       */
229      public static final String MAPRED_MAP_TASK_JAVA_OPTS = 
230        JobContext.MAP_JAVA_OPTS;
231      
232      /**
233       * Configuration key to set the java command line options for the reduce tasks.
234       * 
235       * Java opts for the task tracker child reduce processes.
236       * The following symbol, if present, will be interpolated: @taskid@. 
237       * It is replaced by current TaskID. Any other occurrences of '@' will go 
238       * unchanged.
239       * For example, to enable verbose gc logging to a file named for the taskid in
240       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
241       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
242       * 
243       * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 
244       * pass process environment variables to the reduce processes.
245       */
246      public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
247        JobContext.REDUCE_JAVA_OPTS;
248      
249      public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
250      
251      /**
252       * @deprecated
253       * Configuration key to set the maximum virtual memory available to the child
254       * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
255       * longer have any effect.
256       */
257      @Deprecated
258      public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
259    
260      /**
261       * @deprecated
262       * Configuration key to set the maximum virtual memory available to the
263       * map tasks (in kilo-bytes). This has been deprecated and will no
264       * longer have any effect.
265       */
266      @Deprecated
267      public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
268      
269      /**
270       * @deprecated
271       * Configuration key to set the maximum virtual memory available to the
272       * reduce tasks (in kilo-bytes). This has been deprecated and will no
273       * longer have any effect.
274       */
275      @Deprecated
276      public static final String MAPRED_REDUCE_TASK_ULIMIT =
277        "mapreduce.reduce.ulimit";
278    
279    
280      /**
281       * Configuration key to set the environment of the child map/reduce tasks.
282       * 
283       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
284       * reference existing environment variables via <code>$key</code> on
285       * Linux or <code>%key%</code> on Windows.
286       * 
287       * Example:
288       * <ul>
289       *   <li> A=foo - This will set the env variable A to foo. </li>
290       *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
291       *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
292       * </ul>
293       * 
294       * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 
295       *                 {@link #MAPRED_REDUCE_TASK_ENV}
296       */
297      @Deprecated
298      public static final String MAPRED_TASK_ENV = "mapred.child.env";
299    
300      /**
301       * Configuration key to set the environment of the child map tasks.
302       * 
303       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
304       * reference existing environment variables via <code>$key</code> on
305       * Linux or <code>%key%</code> on Windows.
306       * 
307       * Example:
308       * <ul>
309       *   <li> A=foo - This will set the env variable A to foo. </li>
310       *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
311       *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
312       * </ul>
313       */
314      public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
315      
316      /**
317       * Configuration key to set the environment of the child reduce tasks.
318       * 
319       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
320       * reference existing environment variables via <code>$key</code> on
321       * Linux or <code>%key%</code> on Windows.
322       * 
323       * Example:
324       * <ul>
325       *   <li> A=foo - This will set the env variable A to foo. </li>
326       *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
327       *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
328       * </ul>
329       */
330      public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
331    
332      private Credentials credentials = new Credentials();
333      
334      /**
335       * Configuration key to set the logging {@link Level} for the map task.
336       *
337       * The allowed logging levels are:
338       * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
339       */
340      public static final String MAPRED_MAP_TASK_LOG_LEVEL = 
341        JobContext.MAP_LOG_LEVEL;
342      
343      /**
344       * Configuration key to set the logging {@link Level} for the reduce task.
345       *
346       * The allowed logging levels are:
347       * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
348       */
349      public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 
350        JobContext.REDUCE_LOG_LEVEL;
351      
352      /**
353       * Default logging level for map/reduce tasks.
354       */
355      public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
356    
357      /**
358       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
359       * use {@link MRJobConfig#WORKFLOW_ID} instead
360       */
361      @Deprecated
362      public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID;
363    
364      /**
365       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
366       * use {@link MRJobConfig#WORKFLOW_NAME} instead
367       */
368      @Deprecated
369      public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME;
370    
371      /**
372       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
373       * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead
374       */
375      @Deprecated
376      public static final String WORKFLOW_NODE_NAME =
377          MRJobConfig.WORKFLOW_NODE_NAME;
378    
379      /**
380       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
381       * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead
382       */
383      @Deprecated
384      public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
385          MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING;
386    
387      /**
388       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
389       * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead
390       */
391      @Deprecated
392      public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
393          MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN;
394    
395      /**
396       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
397       * use {@link MRJobConfig#WORKFLOW_TAGS} instead
398       */
399      @Deprecated
400      public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS;
401    
402      /**
403       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
404       * not use it
405       */
406      @Deprecated
407      public static final String MAPREDUCE_RECOVER_JOB =
408          "mapreduce.job.restart.recover";
409    
410      /**
411       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
412       * not use it
413       */
414      @Deprecated
415      public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true;
416    
417      /**
418       * Construct a map/reduce job configuration.
419       */
420      public JobConf() {
421        checkAndWarnDeprecation();
422      }
423    
424      /** 
425       * Construct a map/reduce job configuration.
426       * 
427       * @param exampleClass a class whose containing jar is used as the job's jar.
428       */
429      public JobConf(Class exampleClass) {
430        setJarByClass(exampleClass);
431        checkAndWarnDeprecation();
432      }
433      
434      /**
435       * Construct a map/reduce job configuration.
436       * 
437       * @param conf a Configuration whose settings will be inherited.
438       */
439      public JobConf(Configuration conf) {
440        super(conf);
441        
442        if (conf instanceof JobConf) {
443          JobConf that = (JobConf)conf;
444          credentials = that.credentials;
445        }
446        
447        checkAndWarnDeprecation();
448      }
449    
450    
451      /** Construct a map/reduce job configuration.
452       * 
453       * @param conf a Configuration whose settings will be inherited.
454       * @param exampleClass a class whose containing jar is used as the job's jar.
455       */
456      public JobConf(Configuration conf, Class exampleClass) {
457        this(conf);
458        setJarByClass(exampleClass);
459      }
460    
461    
462      /** Construct a map/reduce configuration.
463       *
464       * @param config a Configuration-format XML job description file.
465       */
466      public JobConf(String config) {
467        this(new Path(config));
468      }
469    
470      /** Construct a map/reduce configuration.
471       *
472       * @param config a Configuration-format XML job description file.
473       */
474      public JobConf(Path config) {
475        super();
476        addResource(config);
477        checkAndWarnDeprecation();
478      }
479    
480      /** A new map/reduce configuration where the behavior of reading from the
481       * default resources can be turned off.
482       * <p/>
483       * If the parameter {@code loadDefaults} is false, the new instance
484       * will not load resources from the default files.
485       *
486       * @param loadDefaults specifies whether to load from the default files
487       */
488      public JobConf(boolean loadDefaults) {
489        super(loadDefaults);
490        checkAndWarnDeprecation();
491      }
492    
493      /**
494       * Get credentials for the job.
495       * @return credentials for the job
496       */
497      public Credentials getCredentials() {
498        return credentials;
499      }
500      
501      @Private
502      public void setCredentials(Credentials credentials) {
503        this.credentials = credentials;
504      }
505      
506      /**
507       * Get the user jar for the map-reduce job.
508       * 
509       * @return the user jar for the map-reduce job.
510       */
511      public String getJar() { return get(JobContext.JAR); }
512      
513      /**
514       * Set the user jar for the map-reduce job.
515       * 
516       * @param jar the user jar for the map-reduce job.
517       */
518      public void setJar(String jar) { set(JobContext.JAR, jar); }
519    
520      /**
521       * Get the pattern for jar contents to unpack on the tasktracker
522       */
523      public Pattern getJarUnpackPattern() {
524        return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
525      }
526    
527      
528      /**
529       * Set the job's jar file by finding an example class location.
530       * 
531       * @param cls the example class.
532       */
533      public void setJarByClass(Class cls) {
534        String jar = ClassUtil.findContainingJar(cls);
535        if (jar != null) {
536          setJar(jar);
537        }   
538      }
539    
540      public String[] getLocalDirs() throws IOException {
541        return getTrimmedStrings(MRConfig.LOCAL_DIR);
542      }
543    
544      /**
545       * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
546       */
547      @Deprecated
548      public void deleteLocalFiles() throws IOException {
549        String[] localDirs = getLocalDirs();
550        for (int i = 0; i < localDirs.length; i++) {
551          FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
552        }
553      }
554    
555      public void deleteLocalFiles(String subdir) throws IOException {
556        String[] localDirs = getLocalDirs();
557        for (int i = 0; i < localDirs.length; i++) {
558          FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
559        }
560      }
561    
562      /** 
563       * Constructs a local file name. Files are distributed among configured
564       * local directories.
565       */
566      public Path getLocalPath(String pathString) throws IOException {
567        return getLocalPath(MRConfig.LOCAL_DIR, pathString);
568      }
569    
570      /**
571       * Get the reported username for this job.
572       * 
573       * @return the username
574       */
575      public String getUser() {
576        return get(JobContext.USER_NAME);
577      }
578      
579      /**
580       * Set the reported username for this job.
581       * 
582       * @param user the username for this job.
583       */
584      public void setUser(String user) {
585        set(JobContext.USER_NAME, user);
586      }
587    
588    
589      
590      /**
591       * Set whether the framework should keep the intermediate files for 
592       * failed tasks.
593       * 
594       * @param keep <code>true</code> if framework should keep the intermediate files 
595       *             for failed tasks, <code>false</code> otherwise.
596       * 
597       */
598      public void setKeepFailedTaskFiles(boolean keep) {
599        setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
600      }
601      
602      /**
603       * Should the temporary files for failed tasks be kept?
604       * 
605       * @return should the files be kept?
606       */
607      public boolean getKeepFailedTaskFiles() {
608        return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
609      }
610      
611      /**
612       * Set a regular expression for task names that should be kept. 
613       * The regular expression ".*_m_000123_0" would keep the files
614       * for the first instance of map 123 that ran.
615       * 
616       * @param pattern the java.util.regex.Pattern to match against the 
617       *        task names.
618       */
619      public void setKeepTaskFilesPattern(String pattern) {
620        set(JobContext.PRESERVE_FILES_PATTERN, pattern);
621      }
622      
623      /**
624       * Get the regular expression that is matched against the task names
625       * to see if we need to keep the files.
626       * 
627       * @return the pattern as a string, if it was set, othewise null.
628       */
629      public String getKeepTaskFilesPattern() {
630        return get(JobContext.PRESERVE_FILES_PATTERN);
631      }
632      
633      /**
634       * Set the current working directory for the default file system.
635       * 
636       * @param dir the new current working directory.
637       */
638      public void setWorkingDirectory(Path dir) {
639        dir = new Path(getWorkingDirectory(), dir);
640        set(JobContext.WORKING_DIR, dir.toString());
641      }
642      
643      /**
644       * Get the current working directory for the default file system.
645       * 
646       * @return the directory name.
647       */
648      public Path getWorkingDirectory() {
649        String name = get(JobContext.WORKING_DIR);
650        if (name != null) {
651          return new Path(name);
652        } else {
653          try {
654            Path dir = FileSystem.get(this).getWorkingDirectory();
655            set(JobContext.WORKING_DIR, dir.toString());
656            return dir;
657          } catch (IOException e) {
658            throw new RuntimeException(e);
659          }
660        }
661      }
662      
663      /**
664       * Sets the number of tasks that a spawned task JVM should run
665       * before it exits
666       * @param numTasks the number of tasks to execute; defaults to 1;
667       * -1 signifies no limit
668       */
669      public void setNumTasksToExecutePerJvm(int numTasks) {
670        setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
671      }
672      
673      /**
674       * Get the number of tasks that a spawned JVM should execute
675       */
676      public int getNumTasksToExecutePerJvm() {
677        return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
678      }
679      
680      /**
681       * Get the {@link InputFormat} implementation for the map-reduce job,
682       * defaults to {@link TextInputFormat} if not specified explicity.
683       * 
684       * @return the {@link InputFormat} implementation for the map-reduce job.
685       */
686      public InputFormat getInputFormat() {
687        return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
688                                                                 TextInputFormat.class,
689                                                                 InputFormat.class),
690                                                        this);
691      }
692      
693      /**
694       * Set the {@link InputFormat} implementation for the map-reduce job.
695       * 
696       * @param theClass the {@link InputFormat} implementation for the map-reduce 
697       *                 job.
698       */
699      public void setInputFormat(Class<? extends InputFormat> theClass) {
700        setClass("mapred.input.format.class", theClass, InputFormat.class);
701      }
702      
703      /**
704       * Get the {@link OutputFormat} implementation for the map-reduce job,
705       * defaults to {@link TextOutputFormat} if not specified explicity.
706       * 
707       * @return the {@link OutputFormat} implementation for the map-reduce job.
708       */
709      public OutputFormat getOutputFormat() {
710        return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
711                                                                  TextOutputFormat.class,
712                                                                  OutputFormat.class),
713                                                         this);
714      }
715    
716      /**
717       * Get the {@link OutputCommitter} implementation for the map-reduce job,
718       * defaults to {@link FileOutputCommitter} if not specified explicitly.
719       * 
720       * @return the {@link OutputCommitter} implementation for the map-reduce job.
721       */
722      public OutputCommitter getOutputCommitter() {
723        return (OutputCommitter)ReflectionUtils.newInstance(
724          getClass("mapred.output.committer.class", FileOutputCommitter.class,
725                   OutputCommitter.class), this);
726      }
727    
728      /**
729       * Set the {@link OutputCommitter} implementation for the map-reduce job.
730       * 
731       * @param theClass the {@link OutputCommitter} implementation for the map-reduce 
732       *                 job.
733       */
734      public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
735        setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
736      }
737      
738      /**
739       * Set the {@link OutputFormat} implementation for the map-reduce job.
740       * 
741       * @param theClass the {@link OutputFormat} implementation for the map-reduce 
742       *                 job.
743       */
744      public void setOutputFormat(Class<? extends OutputFormat> theClass) {
745        setClass("mapred.output.format.class", theClass, OutputFormat.class);
746      }
747    
748      /**
749       * Should the map outputs be compressed before transfer?
750       * 
751       * @param compress should the map outputs be compressed?
752       */
753      public void setCompressMapOutput(boolean compress) {
754        setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
755      }
756      
757      /**
758       * Are the outputs of the maps be compressed?
759       * 
760       * @return <code>true</code> if the outputs of the maps are to be compressed,
761       *         <code>false</code> otherwise.
762       */
763      public boolean getCompressMapOutput() {
764        return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
765      }
766    
767      /**
768       * Set the given class as the  {@link CompressionCodec} for the map outputs.
769       * 
770       * @param codecClass the {@link CompressionCodec} class that will compress  
771       *                   the map outputs.
772       */
773      public void 
774      setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
775        setCompressMapOutput(true);
776        setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 
777                 CompressionCodec.class);
778      }
779      
780      /**
781       * Get the {@link CompressionCodec} for compressing the map outputs.
782       * 
783       * @param defaultValue the {@link CompressionCodec} to return if not set
784       * @return the {@link CompressionCodec} class that should be used to compress the 
785       *         map outputs.
786       * @throws IllegalArgumentException if the class was specified, but not found
787       */
788      public Class<? extends CompressionCodec> 
789      getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
790        Class<? extends CompressionCodec> codecClass = defaultValue;
791        String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
792        if (name != null) {
793          try {
794            codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
795          } catch (ClassNotFoundException e) {
796            throw new IllegalArgumentException("Compression codec " + name + 
797                                               " was not found.", e);
798          }
799        }
800        return codecClass;
801      }
802      
803      /**
804       * Get the key class for the map output data. If it is not set, use the
805       * (final) output key class. This allows the map output key class to be
806       * different than the final output key class.
807       *  
808       * @return the map output key class.
809       */
810      public Class<?> getMapOutputKeyClass() {
811        Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
812        if (retv == null) {
813          retv = getOutputKeyClass();
814        }
815        return retv;
816      }
817      
818      /**
819       * Set the key class for the map output data. This allows the user to
820       * specify the map output key class to be different than the final output
821       * value class.
822       * 
823       * @param theClass the map output key class.
824       */
825      public void setMapOutputKeyClass(Class<?> theClass) {
826        setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
827      }
828      
829      /**
830       * Get the value class for the map output data. If it is not set, use the
831       * (final) output value class This allows the map output value class to be
832       * different than the final output value class.
833       *  
834       * @return the map output value class.
835       */
836      public Class<?> getMapOutputValueClass() {
837        Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
838            Object.class);
839        if (retv == null) {
840          retv = getOutputValueClass();
841        }
842        return retv;
843      }
844      
845      /**
846       * Set the value class for the map output data. This allows the user to
847       * specify the map output value class to be different than the final output
848       * value class.
849       * 
850       * @param theClass the map output value class.
851       */
852      public void setMapOutputValueClass(Class<?> theClass) {
853        setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
854      }
855      
856      /**
857       * Get the key class for the job output data.
858       * 
859       * @return the key class for the job output data.
860       */
861      public Class<?> getOutputKeyClass() {
862        return getClass(JobContext.OUTPUT_KEY_CLASS,
863                        LongWritable.class, Object.class);
864      }
865      
866      /**
867       * Set the key class for the job output data.
868       * 
869       * @param theClass the key class for the job output data.
870       */
871      public void setOutputKeyClass(Class<?> theClass) {
872        setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
873      }
874    
875      /**
876       * Get the {@link RawComparator} comparator used to compare keys.
877       * 
878       * @return the {@link RawComparator} comparator used to compare keys.
879       */
880      public RawComparator getOutputKeyComparator() {
881        Class<? extends RawComparator> theClass = getClass(
882          JobContext.KEY_COMPARATOR, null, RawComparator.class);
883        if (theClass != null)
884          return ReflectionUtils.newInstance(theClass, this);
885        return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
886      }
887    
888      /**
889       * Set the {@link RawComparator} comparator used to compare keys.
890       * 
891       * @param theClass the {@link RawComparator} comparator used to 
892       *                 compare keys.
893       * @see #setOutputValueGroupingComparator(Class)                 
894       */
895      public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
896        setClass(JobContext.KEY_COMPARATOR,
897                 theClass, RawComparator.class);
898      }
899    
900      /**
901       * Set the {@link KeyFieldBasedComparator} options used to compare keys.
902       * 
903       * @param keySpec the key specification of the form -k pos1[,pos2], where,
904       *  pos is of the form f[.c][opts], where f is the number
905       *  of the key field to use, and c is the number of the first character from
906       *  the beginning of the field. Fields and character posns are numbered 
907       *  starting with 1; a character position of zero in pos2 indicates the
908       *  field's last character. If '.c' is omitted from pos1, it defaults to 1
909       *  (the beginning of the field); if omitted from pos2, it defaults to 0 
910       *  (the end of the field). opts are ordering options. The supported options
911       *  are:
912       *    -n, (Sort numerically)
913       *    -r, (Reverse the result of comparison)                 
914       */
915      public void setKeyFieldComparatorOptions(String keySpec) {
916        setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
917        set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
918      }
919      
920      /**
921       * Get the {@link KeyFieldBasedComparator} options
922       */
923      public String getKeyFieldComparatorOption() {
924        return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
925      }
926    
927      /**
928       * Set the {@link KeyFieldBasedPartitioner} options used for 
929       * {@link Partitioner}
930       * 
931       * @param keySpec the key specification of the form -k pos1[,pos2], where,
932       *  pos is of the form f[.c][opts], where f is the number
933       *  of the key field to use, and c is the number of the first character from
934       *  the beginning of the field. Fields and character posns are numbered 
935       *  starting with 1; a character position of zero in pos2 indicates the
936       *  field's last character. If '.c' is omitted from pos1, it defaults to 1
937       *  (the beginning of the field); if omitted from pos2, it defaults to 0 
938       *  (the end of the field).
939       */
940      public void setKeyFieldPartitionerOptions(String keySpec) {
941        setPartitionerClass(KeyFieldBasedPartitioner.class);
942        set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
943      }
944      
945      /**
946       * Get the {@link KeyFieldBasedPartitioner} options
947       */
948      public String getKeyFieldPartitionerOption() {
949        return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
950      }
951    
952      /**
953       * Get the user defined {@link WritableComparable} comparator for
954       * grouping keys of inputs to the combiner.
955       *
956       * @return comparator set by the user for grouping values.
957       * @see #setCombinerKeyGroupingComparator(Class) for details.
958       */
959      public RawComparator getCombinerKeyGroupingComparator() {
960        Class<? extends RawComparator> theClass = getClass(
961            JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
962        if (theClass == null) {
963          return getOutputKeyComparator();
964        }
965    
966        return ReflectionUtils.newInstance(theClass, this);
967      }
968    
969      /** 
970       * Get the user defined {@link WritableComparable} comparator for 
971       * grouping keys of inputs to the reduce.
972       * 
973       * @return comparator set by the user for grouping values.
974       * @see #setOutputValueGroupingComparator(Class) for details.
975       */
976      public RawComparator getOutputValueGroupingComparator() {
977        Class<? extends RawComparator> theClass = getClass(
978          JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
979        if (theClass == null) {
980          return getOutputKeyComparator();
981        }
982        
983        return ReflectionUtils.newInstance(theClass, this);
984      }
985    
986      /**
987       * Set the user defined {@link RawComparator} comparator for
988       * grouping keys in the input to the combiner.
989       * <p/>
990       * <p>This comparator should be provided if the equivalence rules for keys
991       * for sorting the intermediates are different from those for grouping keys
992       * before each call to
993       * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
994       * <p/>
995       * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
996       * in a single call to the reduce function if K1 and K2 compare as equal.</p>
997       * <p/>
998       * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
999       * how keys are sorted, this can be used in conjunction to simulate
1000       * <i>secondary sort on values</i>.</p>
1001       * <p/>
1002       * <p><i>Note</i>: This is not a guarantee of the combiner sort being
1003       * <i>stable</i> in any sense. (In any case, with the order of available
1004       * map-outputs to the combiner being non-deterministic, it wouldn't make
1005       * that much sense.)</p>
1006       *
1007       * @param theClass the comparator class to be used for grouping keys for the
1008       * combiner. It should implement <code>RawComparator</code>.
1009       * @see #setOutputKeyComparatorClass(Class)
1010       */
1011      public void setCombinerKeyGroupingComparator(
1012          Class<? extends RawComparator> theClass) {
1013        setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
1014            theClass, RawComparator.class);
1015      }
1016    
1017      /** 
1018       * Set the user defined {@link RawComparator} comparator for 
1019       * grouping keys in the input to the reduce.
1020       * 
1021       * <p>This comparator should be provided if the equivalence rules for keys
1022       * for sorting the intermediates are different from those for grouping keys
1023       * before each call to 
1024       * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
1025       *  
1026       * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
1027       * in a single call to the reduce function if K1 and K2 compare as equal.</p>
1028       * 
1029       * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 
1030       * how keys are sorted, this can be used in conjunction to simulate 
1031       * <i>secondary sort on values</i>.</p>
1032       *  
1033       * <p><i>Note</i>: This is not a guarantee of the reduce sort being 
1034       * <i>stable</i> in any sense. (In any case, with the order of available 
1035       * map-outputs to the reduce being non-deterministic, it wouldn't make 
1036       * that much sense.)</p>
1037       * 
1038       * @param theClass the comparator class to be used for grouping keys. 
1039       *                 It should implement <code>RawComparator</code>.
1040       * @see #setOutputKeyComparatorClass(Class)
1041       * @see #setCombinerKeyGroupingComparator(Class)
1042       */
1043      public void setOutputValueGroupingComparator(
1044          Class<? extends RawComparator> theClass) {
1045        setClass(JobContext.GROUP_COMPARATOR_CLASS,
1046                 theClass, RawComparator.class);
1047      }
1048    
1049      /**
1050       * Should the framework use the new context-object code for running
1051       * the mapper?
1052       * @return true, if the new api should be used
1053       */
1054      public boolean getUseNewMapper() {
1055        return getBoolean("mapred.mapper.new-api", false);
1056      }
1057      /**
1058       * Set whether the framework should use the new api for the mapper.
1059       * This is the default for jobs submitted with the new Job api.
1060       * @param flag true, if the new api should be used
1061       */
1062      public void setUseNewMapper(boolean flag) {
1063        setBoolean("mapred.mapper.new-api", flag);
1064      }
1065    
1066      /**
1067       * Should the framework use the new context-object code for running
1068       * the reducer?
1069       * @return true, if the new api should be used
1070       */
1071      public boolean getUseNewReducer() {
1072        return getBoolean("mapred.reducer.new-api", false);
1073      }
1074      /**
1075       * Set whether the framework should use the new api for the reducer. 
1076       * This is the default for jobs submitted with the new Job api.
1077       * @param flag true, if the new api should be used
1078       */
1079      public void setUseNewReducer(boolean flag) {
1080        setBoolean("mapred.reducer.new-api", flag);
1081      }
1082    
1083      /**
1084       * Get the value class for job outputs.
1085       * 
1086       * @return the value class for job outputs.
1087       */
1088      public Class<?> getOutputValueClass() {
1089        return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
1090      }
1091      
1092      /**
1093       * Set the value class for job outputs.
1094       * 
1095       * @param theClass the value class for job outputs.
1096       */
1097      public void setOutputValueClass(Class<?> theClass) {
1098        setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
1099      }
1100    
1101      /**
1102       * Get the {@link Mapper} class for the job.
1103       * 
1104       * @return the {@link Mapper} class for the job.
1105       */
1106      public Class<? extends Mapper> getMapperClass() {
1107        return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
1108      }
1109      
1110      /**
1111       * Set the {@link Mapper} class for the job.
1112       * 
1113       * @param theClass the {@link Mapper} class for the job.
1114       */
1115      public void setMapperClass(Class<? extends Mapper> theClass) {
1116        setClass("mapred.mapper.class", theClass, Mapper.class);
1117      }
1118    
1119      /**
1120       * Get the {@link MapRunnable} class for the job.
1121       * 
1122       * @return the {@link MapRunnable} class for the job.
1123       */
1124      public Class<? extends MapRunnable> getMapRunnerClass() {
1125        return getClass("mapred.map.runner.class",
1126                        MapRunner.class, MapRunnable.class);
1127      }
1128      
1129      /**
1130       * Expert: Set the {@link MapRunnable} class for the job.
1131       * 
1132       * Typically used to exert greater control on {@link Mapper}s.
1133       * 
1134       * @param theClass the {@link MapRunnable} class for the job.
1135       */
1136      public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1137        setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1138      }
1139    
1140      /**
1141       * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 
1142       * to be sent to the {@link Reducer}s.
1143       * 
1144       * @return the {@link Partitioner} used to partition map-outputs.
1145       */
1146      public Class<? extends Partitioner> getPartitionerClass() {
1147        return getClass("mapred.partitioner.class",
1148                        HashPartitioner.class, Partitioner.class);
1149      }
1150      
1151      /**
1152       * Set the {@link Partitioner} class used to partition 
1153       * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1154       * 
1155       * @param theClass the {@link Partitioner} used to partition map-outputs.
1156       */
1157      public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1158        setClass("mapred.partitioner.class", theClass, Partitioner.class);
1159      }
1160    
1161      /**
1162       * Get the {@link Reducer} class for the job.
1163       * 
1164       * @return the {@link Reducer} class for the job.
1165       */
1166      public Class<? extends Reducer> getReducerClass() {
1167        return getClass("mapred.reducer.class",
1168                        IdentityReducer.class, Reducer.class);
1169      }
1170      
1171      /**
1172       * Set the {@link Reducer} class for the job.
1173       * 
1174       * @param theClass the {@link Reducer} class for the job.
1175       */
1176      public void setReducerClass(Class<? extends Reducer> theClass) {
1177        setClass("mapred.reducer.class", theClass, Reducer.class);
1178      }
1179    
1180      /**
1181       * Get the user-defined <i>combiner</i> class used to combine map-outputs 
1182       * before being sent to the reducers. Typically the combiner is same as the
1183       * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1184       * 
1185       * @return the user-defined combiner class used to combine map-outputs.
1186       */
1187      public Class<? extends Reducer> getCombinerClass() {
1188        return getClass("mapred.combiner.class", null, Reducer.class);
1189      }
1190    
1191      /**
1192       * Set the user-defined <i>combiner</i> class used to combine map-outputs 
1193       * before being sent to the reducers. 
1194       * 
1195       * <p>The combiner is an application-specified aggregation operation, which
1196       * can help cut down the amount of data transferred between the 
1197       * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1198       * 
1199       * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1200       * the mapper and reducer tasks. In general, the combiner is called as the
1201       * sort/merge result is written to disk. The combiner must:
1202       * <ul>
1203       *   <li> be side-effect free</li>
1204       *   <li> have the same input and output key types and the same input and 
1205       *        output value types</li>
1206       * </ul></p>
1207       * 
1208       * <p>Typically the combiner is same as the <code>Reducer</code> for the  
1209       * job i.e. {@link #setReducerClass(Class)}.</p>
1210       * 
1211       * @param theClass the user-defined combiner class used to combine 
1212       *                 map-outputs.
1213       */
1214      public void setCombinerClass(Class<? extends Reducer> theClass) {
1215        setClass("mapred.combiner.class", theClass, Reducer.class);
1216      }
1217      
1218      /**
1219       * Should speculative execution be used for this job? 
1220       * Defaults to <code>true</code>.
1221       * 
1222       * @return <code>true</code> if speculative execution be used for this job,
1223       *         <code>false</code> otherwise.
1224       */
1225      public boolean getSpeculativeExecution() { 
1226        return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1227      }
1228      
1229      /**
1230       * Turn speculative execution on or off for this job. 
1231       * 
1232       * @param speculativeExecution <code>true</code> if speculative execution 
1233       *                             should be turned on, else <code>false</code>.
1234       */
1235      public void setSpeculativeExecution(boolean speculativeExecution) {
1236        setMapSpeculativeExecution(speculativeExecution);
1237        setReduceSpeculativeExecution(speculativeExecution);
1238      }
1239    
1240      /**
1241       * Should speculative execution be used for this job for map tasks? 
1242       * Defaults to <code>true</code>.
1243       * 
1244       * @return <code>true</code> if speculative execution be 
1245       *                           used for this job for map tasks,
1246       *         <code>false</code> otherwise.
1247       */
1248      public boolean getMapSpeculativeExecution() { 
1249        return getBoolean(JobContext.MAP_SPECULATIVE, true);
1250      }
1251      
1252      /**
1253       * Turn speculative execution on or off for this job for map tasks. 
1254       * 
1255       * @param speculativeExecution <code>true</code> if speculative execution 
1256       *                             should be turned on for map tasks,
1257       *                             else <code>false</code>.
1258       */
1259      public void setMapSpeculativeExecution(boolean speculativeExecution) {
1260        setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1261      }
1262    
1263      /**
1264       * Should speculative execution be used for this job for reduce tasks? 
1265       * Defaults to <code>true</code>.
1266       * 
1267       * @return <code>true</code> if speculative execution be used 
1268       *                           for reduce tasks for this job,
1269       *         <code>false</code> otherwise.
1270       */
1271      public boolean getReduceSpeculativeExecution() { 
1272        return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1273      }
1274      
1275      /**
1276       * Turn speculative execution on or off for this job for reduce tasks. 
1277       * 
1278       * @param speculativeExecution <code>true</code> if speculative execution 
1279       *                             should be turned on for reduce tasks,
1280       *                             else <code>false</code>.
1281       */
1282      public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1283        setBoolean(JobContext.REDUCE_SPECULATIVE, 
1284                   speculativeExecution);
1285      }
1286    
1287      /**
1288       * Get configured the number of reduce tasks for this job.
1289       * Defaults to <code>1</code>.
1290       * 
1291       * @return the number of reduce tasks for this job.
1292       */
1293      public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1294      
1295      /**
1296       * Set the number of map tasks for this job.
1297       * 
1298       * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 
1299       * number of spawned map tasks depends on the number of {@link InputSplit}s 
1300       * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1301       *  
1302       * A custom {@link InputFormat} is typically used to accurately control 
1303       * the number of map tasks for the job.</p>
1304       * 
1305       * <h4 id="NoOfMaps">How many maps?</h4>
1306       * 
1307       * <p>The number of maps is usually driven by the total size of the inputs 
1308       * i.e. total number of blocks of the input files.</p>
1309       *  
1310       * <p>The right level of parallelism for maps seems to be around 10-100 maps 
1311       * per-node, although it has been set up to 300 or so for very cpu-light map 
1312       * tasks. Task setup takes awhile, so it is best if the maps take at least a 
1313       * minute to execute.</p>
1314       * 
1315       * <p>The default behavior of file-based {@link InputFormat}s is to split the 
1316       * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
1317       * bytes, of input files. However, the {@link FileSystem} blocksize of the 
1318       * input files is treated as an upper bound for input splits. A lower bound 
1319       * on the split size can be set via 
1320       * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
1321       * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1322       *  
1323       * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 
1324       * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 
1325       * used to set it even higher.</p>
1326       * 
1327       * @param n the number of map tasks for this job.
1328       * @see InputFormat#getSplits(JobConf, int)
1329       * @see FileInputFormat
1330       * @see FileSystem#getDefaultBlockSize()
1331       * @see FileStatus#getBlockSize()
1332       */
1333      public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1334    
1335      /**
1336       * Get configured the number of reduce tasks for this job. Defaults to 
1337       * <code>1</code>.
1338       * 
1339       * @return the number of reduce tasks for this job.
1340       */
1341      public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1342      
1343      /**
1344       * Set the requisite number of reduce tasks for this job.
1345       * 
1346       * <h4 id="NoOfReduces">How many reduces?</h4>
1347       * 
1348       * <p>The right number of reduces seems to be <code>0.95</code> or 
1349       * <code>1.75</code> multiplied by (&lt;<i>no. of nodes</i>&gt; * 
1350       * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
1351       * mapreduce.tasktracker.reduce.tasks.maximum</a>).
1352       * </p>
1353       * 
1354       * <p>With <code>0.95</code> all of the reduces can launch immediately and 
1355       * start transfering map outputs as the maps finish. With <code>1.75</code> 
1356       * the faster nodes will finish their first round of reduces and launch a 
1357       * second wave of reduces doing a much better job of load balancing.</p>
1358       * 
1359       * <p>Increasing the number of reduces increases the framework overhead, but 
1360       * increases load balancing and lowers the cost of failures.</p>
1361       * 
1362       * <p>The scaling factors above are slightly less than whole numbers to 
1363       * reserve a few reduce slots in the framework for speculative-tasks, failures
1364       * etc.</p> 
1365       *
1366       * <h4 id="ReducerNone">Reducer NONE</h4>
1367       * 
1368       * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1369       * 
1370       * <p>In this case the output of the map-tasks directly go to distributed 
1371       * file-system, to the path set by 
1372       * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 
1373       * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1374       * 
1375       * @param n the number of reduce tasks for this job.
1376       */
1377      public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1378      
1379      /** 
1380       * Get the configured number of maximum attempts that will be made to run a
1381       * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1382       * property. If this property is not already set, the default is 4 attempts.
1383       *  
1384       * @return the max number of attempts per map task.
1385       */
1386      public int getMaxMapAttempts() {
1387        return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1388      }
1389      
1390      /** 
1391       * Expert: Set the number of maximum attempts that will be made to run a
1392       * map task.
1393       * 
1394       * @param n the number of attempts per map task.
1395       */
1396      public void setMaxMapAttempts(int n) {
1397        setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1398      }
1399    
1400      /** 
1401       * Get the configured number of maximum attempts  that will be made to run a
1402       * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1403       * property. If this property is not already set, the default is 4 attempts.
1404       * 
1405       * @return the max number of attempts per reduce task.
1406       */
1407      public int getMaxReduceAttempts() {
1408        return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1409      }
1410      /** 
1411       * Expert: Set the number of maximum attempts that will be made to run a
1412       * reduce task.
1413       * 
1414       * @param n the number of attempts per reduce task.
1415       */
1416      public void setMaxReduceAttempts(int n) {
1417        setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1418      }
1419      
1420      /**
1421       * Get the user-specified job name. This is only used to identify the 
1422       * job to the user.
1423       * 
1424       * @return the job's name, defaulting to "".
1425       */
1426      public String getJobName() {
1427        return get(JobContext.JOB_NAME, "");
1428      }
1429      
1430      /**
1431       * Set the user-specified job name.
1432       * 
1433       * @param name the job's new name.
1434       */
1435      public void setJobName(String name) {
1436        set(JobContext.JOB_NAME, name);
1437      }
1438      
1439      /**
1440       * Get the user-specified session identifier. The default is the empty string.
1441       *
1442       * The session identifier is used to tag metric data that is reported to some
1443       * performance metrics system via the org.apache.hadoop.metrics API.  The 
1444       * session identifier is intended, in particular, for use by Hadoop-On-Demand 
1445       * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 
1446       * HOD will set the session identifier by modifying the mapred-site.xml file 
1447       * before starting the cluster.
1448       *
1449       * When not running under HOD, this identifer is expected to remain set to 
1450       * the empty string.
1451       *
1452       * @return the session identifier, defaulting to "".
1453       */
1454      @Deprecated
1455      public String getSessionId() {
1456          return get("session.id", "");
1457      }
1458      
1459      /**
1460       * Set the user-specified session identifier.  
1461       *
1462       * @param sessionId the new session id.
1463       */
1464      @Deprecated
1465      public void setSessionId(String sessionId) {
1466          set("session.id", sessionId);
1467      }
1468        
1469      /**
1470       * Set the maximum no. of failures of a given job per tasktracker.
1471       * If the no. of task failures exceeds <code>noFailures</code>, the 
1472       * tasktracker is <i>blacklisted</i> for this job. 
1473       * 
1474       * @param noFailures maximum no. of failures of a given job per tasktracker.
1475       */
1476      public void setMaxTaskFailuresPerTracker(int noFailures) {
1477        setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1478      }
1479      
1480      /**
1481       * Expert: Get the maximum no. of failures of a given job per tasktracker.
1482       * If the no. of task failures exceeds this, the tasktracker is
1483       * <i>blacklisted</i> for this job. 
1484       * 
1485       * @return the maximum no. of failures of a given job per tasktracker.
1486       */
1487      public int getMaxTaskFailuresPerTracker() {
1488        return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
1489      }
1490    
1491      /**
1492       * Get the maximum percentage of map tasks that can fail without 
1493       * the job being aborted. 
1494       * 
1495       * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 
1496       * attempts before being declared as <i>failed</i>.
1497       *  
1498       * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1499       * the job being declared as {@link JobStatus#FAILED}.
1500       * 
1501       * @return the maximum percentage of map tasks that can fail without
1502       *         the job being aborted.
1503       */
1504      public int getMaxMapTaskFailuresPercent() {
1505        return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1506      }
1507    
1508      /**
1509       * Expert: Set the maximum percentage of map tasks that can fail without the
1510       * job being aborted. 
1511       * 
1512       * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 
1513       * before being declared as <i>failed</i>.
1514       * 
1515       * @param percent the maximum percentage of map tasks that can fail without 
1516       *                the job being aborted.
1517       */
1518      public void setMaxMapTaskFailuresPercent(int percent) {
1519        setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1520      }
1521      
1522      /**
1523       * Get the maximum percentage of reduce tasks that can fail without 
1524       * the job being aborted. 
1525       * 
1526       * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1527       * attempts before being declared as <i>failed</i>.
1528       * 
1529       * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 
1530       * in the job being declared as {@link JobStatus#FAILED}.
1531       * 
1532       * @return the maximum percentage of reduce tasks that can fail without
1533       *         the job being aborted.
1534       */
1535      public int getMaxReduceTaskFailuresPercent() {
1536        return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1537      }
1538      
1539      /**
1540       * Set the maximum percentage of reduce tasks that can fail without the job
1541       * being aborted.
1542       * 
1543       * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1544       * attempts before being declared as <i>failed</i>.
1545       * 
1546       * @param percent the maximum percentage of reduce tasks that can fail without 
1547       *                the job being aborted.
1548       */
1549      public void setMaxReduceTaskFailuresPercent(int percent) {
1550        setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1551      }
1552      
1553      /**
1554       * Set {@link JobPriority} for this job.
1555       * 
1556       * @param prio the {@link JobPriority} for this job.
1557       */
1558      public void setJobPriority(JobPriority prio) {
1559        set(JobContext.PRIORITY, prio.toString());
1560      }
1561      
1562      /**
1563       * Get the {@link JobPriority} for this job.
1564       * 
1565       * @return the {@link JobPriority} for this job.
1566       */
1567      public JobPriority getJobPriority() {
1568        String prio = get(JobContext.PRIORITY);
1569        if(prio == null) {
1570          return JobPriority.NORMAL;
1571        }
1572        
1573        return JobPriority.valueOf(prio);
1574      }
1575    
1576      /**
1577       * Set JobSubmitHostName for this job.
1578       * 
1579       * @param hostname the JobSubmitHostName for this job.
1580       */
1581      void setJobSubmitHostName(String hostname) {
1582        set(MRJobConfig.JOB_SUBMITHOST, hostname);
1583      }
1584      
1585      /**
1586       * Get the  JobSubmitHostName for this job.
1587       * 
1588       * @return the JobSubmitHostName for this job.
1589       */
1590      String getJobSubmitHostName() {
1591        String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1592        
1593        return hostname;
1594      }
1595    
1596      /**
1597       * Set JobSubmitHostAddress for this job.
1598       * 
1599       * @param hostadd the JobSubmitHostAddress for this job.
1600       */
1601      void setJobSubmitHostAddress(String hostadd) {
1602        set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1603      }
1604      
1605      /**
1606       * Get JobSubmitHostAddress for this job.
1607       * 
1608       * @return  JobSubmitHostAddress for this job.
1609       */
1610      String getJobSubmitHostAddress() {
1611        String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1612        
1613        return hostadd;
1614      }
1615    
1616      /**
1617       * Get whether the task profiling is enabled.
1618       * @return true if some tasks will be profiled
1619       */
1620      public boolean getProfileEnabled() {
1621        return getBoolean(JobContext.TASK_PROFILE, false);
1622      }
1623    
1624      /**
1625       * Set whether the system should collect profiler information for some of 
1626       * the tasks in this job? The information is stored in the user log 
1627       * directory.
1628       * @param newValue true means it should be gathered
1629       */
1630      public void setProfileEnabled(boolean newValue) {
1631        setBoolean(JobContext.TASK_PROFILE, newValue);
1632      }
1633    
1634      /**
1635       * Get the profiler configuration arguments.
1636       *
1637       * The default value for this property is
1638       * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1639       * 
1640       * @return the parameters to pass to the task child to configure profiling
1641       */
1642      public String getProfileParams() {
1643        return get(JobContext.TASK_PROFILE_PARAMS,
1644                   "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," +
1645                     "verbose=n,file=%s");
1646      }
1647    
1648      /**
1649       * Set the profiler configuration arguments. If the string contains a '%s' it
1650       * will be replaced with the name of the profiling output file when the task
1651       * runs.
1652       *
1653       * This value is passed to the task child JVM on the command line.
1654       *
1655       * @param value the configuration string
1656       */
1657      public void setProfileParams(String value) {
1658        set(JobContext.TASK_PROFILE_PARAMS, value);
1659      }
1660    
1661      /**
1662       * Get the range of maps or reduces to profile.
1663       * @param isMap is the task a map?
1664       * @return the task ranges
1665       */
1666      public IntegerRanges getProfileTaskRange(boolean isMap) {
1667        return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 
1668                           JobContext.NUM_REDUCE_PROFILES), "0-2");
1669      }
1670    
1671      /**
1672       * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1673       * must also be called.
1674       * @param newValue a set of integer ranges of the map ids
1675       */
1676      public void setProfileTaskRange(boolean isMap, String newValue) {
1677        // parse the value to make sure it is legal
1678          new Configuration.IntegerRanges(newValue);
1679        set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 
1680              newValue);
1681      }
1682    
1683      /**
1684       * Set the debug script to run when the map tasks fail.
1685       * 
1686       * <p>The debug script can aid debugging of failed map tasks. The script is 
1687       * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1688       * 
1689       * <p>The debug command, run on the node where the map failed, is:</p>
1690       * <p><pre><blockquote> 
1691       * $script $stdout $stderr $syslog $jobconf.
1692       * </blockquote></pre></p>
1693       * 
1694       * <p> The script file is distributed through {@link DistributedCache} 
1695       * APIs. The script needs to be symlinked. </p>
1696       * 
1697       * <p>Here is an example on how to submit a script 
1698       * <p><blockquote><pre>
1699       * job.setMapDebugScript("./myscript");
1700       * DistributedCache.createSymlink(job);
1701       * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1702       * </pre></blockquote></p>
1703       * 
1704       * @param mDbgScript the script name
1705       */
1706      public void  setMapDebugScript(String mDbgScript) {
1707        set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1708      }
1709      
1710      /**
1711       * Get the map task's debug script.
1712       * 
1713       * @return the debug Script for the mapred job for failed map tasks.
1714       * @see #setMapDebugScript(String)
1715       */
1716      public String getMapDebugScript() {
1717        return get(JobContext.MAP_DEBUG_SCRIPT);
1718      }
1719      
1720      /**
1721       * Set the debug script to run when the reduce tasks fail.
1722       * 
1723       * <p>The debug script can aid debugging of failed reduce tasks. The script
1724       * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1725       * 
1726       * <p>The debug command, run on the node where the map failed, is:</p>
1727       * <p><pre><blockquote> 
1728       * $script $stdout $stderr $syslog $jobconf.
1729       * </blockquote></pre></p>
1730       * 
1731       * <p> The script file is distributed through {@link DistributedCache} 
1732       * APIs. The script file needs to be symlinked </p>
1733       * 
1734       * <p>Here is an example on how to submit a script 
1735       * <p><blockquote><pre>
1736       * job.setReduceDebugScript("./myscript");
1737       * DistributedCache.createSymlink(job);
1738       * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1739       * </pre></blockquote></p>
1740       * 
1741       * @param rDbgScript the script name
1742       */
1743      public void  setReduceDebugScript(String rDbgScript) {
1744        set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1745      }
1746      
1747      /**
1748       * Get the reduce task's debug Script
1749       * 
1750       * @return the debug script for the mapred job for failed reduce tasks.
1751       * @see #setReduceDebugScript(String)
1752       */
1753      public String getReduceDebugScript() {
1754        return get(JobContext.REDUCE_DEBUG_SCRIPT);
1755      }
1756    
1757      /**
1758       * Get the uri to be invoked in-order to send a notification after the job 
1759       * has completed (success/failure). 
1760       * 
1761       * @return the job end notification uri, <code>null</code> if it hasn't
1762       *         been set.
1763       * @see #setJobEndNotificationURI(String)
1764       */
1765      public String getJobEndNotificationURI() {
1766        return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1767      }
1768    
1769      /**
1770       * Set the uri to be invoked in-order to send a notification after the job
1771       * has completed (success/failure).
1772       * 
1773       * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 
1774       * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 
1775       * identifier and completion-status respectively.</p>
1776       * 
1777       * <p>This is typically used by application-writers to implement chaining of 
1778       * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1779       * 
1780       * @param uri the job end notification uri
1781       * @see JobStatus
1782       * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#
1783       *       JobCompletionAndChaining">Job Completion and Chaining</a>
1784       */
1785      public void setJobEndNotificationURI(String uri) {
1786        set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1787      }
1788    
1789      /**
1790       * Get job-specific shared directory for use as scratch space
1791       * 
1792       * <p>
1793       * When a job starts, a shared directory is created at location
1794       * <code>
1795       * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1796       * This directory is exposed to the users through 
1797       * <code>mapreduce.job.local.dir </code>.
1798       * So, the tasks can use this space 
1799       * as scratch space and share files among them. </p>
1800       * This value is available as System property also.
1801       * 
1802       * @return The localized job specific shared directory
1803       */
1804      public String getJobLocalDir() {
1805        return get(JobContext.JOB_LOCAL_DIR);
1806      }
1807    
1808      /**
1809       * Get memory required to run a map task of the job, in MB.
1810       * 
1811       * If a value is specified in the configuration, it is returned.
1812       * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1813       * <p/>
1814       * For backward compatibility, if the job configuration sets the
1815       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1816       * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1817       * after converting it from bytes to MB.
1818       * @return memory required to run a map task of the job, in MB,
1819       *          or {@link #DISABLED_MEMORY_LIMIT} if unset.
1820       */
1821      public long getMemoryForMapTask() {
1822        long value = getDeprecatedMemoryValue();
1823        if (value == DISABLED_MEMORY_LIMIT) {
1824          value = normalizeMemoryConfigValue(
1825                    getLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY,
1826                              DISABLED_MEMORY_LIMIT));
1827        }
1828        // In case that M/R 1.x applications use the old property name
1829        if (value == DISABLED_MEMORY_LIMIT) {
1830          value = normalizeMemoryConfigValue(
1831                    getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
1832                              DISABLED_MEMORY_LIMIT));
1833        }
1834        return value;
1835      }
1836    
1837      public void setMemoryForMapTask(long mem) {
1838        setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1839        // In case that M/R 1.x applications use the old property name
1840        setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1841      }
1842    
1843      /**
1844       * Get memory required to run a reduce task of the job, in MB.
1845       * 
1846       * If a value is specified in the configuration, it is returned.
1847       * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1848       * <p/>
1849       * For backward compatibility, if the job configuration sets the
1850       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1851       * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1852       * after converting it from bytes to MB.
1853       * @return memory required to run a reduce task of the job, in MB,
1854       *          or {@link #DISABLED_MEMORY_LIMIT} if unset.
1855       */
1856      public long getMemoryForReduceTask() {
1857        long value = getDeprecatedMemoryValue();
1858        if (value == DISABLED_MEMORY_LIMIT) {
1859          value = normalizeMemoryConfigValue(
1860                    getLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY,
1861                            DISABLED_MEMORY_LIMIT));
1862        }
1863        // In case that M/R 1.x applications use the old property name
1864        if (value == DISABLED_MEMORY_LIMIT) {
1865          value = normalizeMemoryConfigValue(
1866                    getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
1867                            DISABLED_MEMORY_LIMIT));
1868        }
1869        return value;
1870      }
1871      
1872      // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1873      // converted into MBs.
1874      // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1875      // value.
1876      private long getDeprecatedMemoryValue() {
1877        long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 
1878            DISABLED_MEMORY_LIMIT);
1879        oldValue = normalizeMemoryConfigValue(oldValue);
1880        if (oldValue != DISABLED_MEMORY_LIMIT) {
1881          oldValue /= (1024*1024);
1882        }
1883        return oldValue;
1884      }
1885    
1886      public void setMemoryForReduceTask(long mem) {
1887        setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1888        // In case that M/R 1.x applications use the old property name
1889        setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1890      }
1891    
1892      /**
1893       * Return the name of the queue to which this job is submitted.
1894       * Defaults to 'default'.
1895       * 
1896       * @return name of the queue
1897       */
1898      public String getQueueName() {
1899        return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1900      }
1901      
1902      /**
1903       * Set the name of the queue to which this job should be submitted.
1904       * 
1905       * @param queueName Name of the queue
1906       */
1907      public void setQueueName(String queueName) {
1908        set(JobContext.QUEUE_NAME, queueName);
1909      }
1910      
1911      /**
1912       * Normalize the negative values in configuration
1913       * 
1914       * @param val
1915       * @return normalized value
1916       */
1917      public static long normalizeMemoryConfigValue(long val) {
1918        if (val < 0) {
1919          val = DISABLED_MEMORY_LIMIT;
1920        }
1921        return val;
1922      }
1923    
1924      /**
1925       * Compute the number of slots required to run a single map task-attempt
1926       * of this job.
1927       * @param slotSizePerMap cluster-wide value of the amount of memory required
1928       *                       to run a map-task
1929       * @return the number of slots required to run a single map task-attempt
1930       *          1 if memory parameters are disabled.
1931       */
1932      int computeNumSlotsPerMap(long slotSizePerMap) {
1933        if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) ||
1934            (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) {
1935          return 1;
1936        }
1937        return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap));
1938      }
1939      
1940      /**
1941       * Compute the number of slots required to run a single reduce task-attempt
1942       * of this job.
1943       * @param slotSizePerReduce cluster-wide value of the amount of memory 
1944       *                          required to run a reduce-task
1945       * @return the number of slots required to run a single reduce task-attempt
1946       *          1 if memory parameters are disabled
1947       */
1948      int computeNumSlotsPerReduce(long slotSizePerReduce) {
1949        if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) ||
1950            (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) {
1951          return 1;
1952        }
1953        return 
1954        (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
1955      }
1956    
1957      /** 
1958       * Find a jar that contains a class of the same name, if any.
1959       * It will return a jar file, even if that is not the first thing
1960       * on the class path that has a class with the same name.
1961       * 
1962       * @param my_class the class to find.
1963       * @return a jar file that contains the class, or null.
1964       * @throws IOException
1965       */
1966      public static String findContainingJar(Class my_class) {
1967        return ClassUtil.findContainingJar(my_class);
1968      }
1969    
1970      /**
1971       * Get the memory required to run a task of this job, in bytes. See
1972       * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1973       * <p/>
1974       * This method is deprecated. Now, different memory limits can be
1975       * set for map and reduce tasks of a job, in MB. 
1976       * <p/>
1977       * For backward compatibility, if the job configuration sets the
1978       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1979       * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned. 
1980       * Otherwise, this method will return the larger of the values returned by 
1981       * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
1982       * after converting them into bytes.
1983       *
1984       * @return Memory required to run a task of this job, in bytes,
1985       *          or {@link #DISABLED_MEMORY_LIMIT}, if unset.
1986       * @see #setMaxVirtualMemoryForTask(long)
1987       * @deprecated Use {@link #getMemoryForMapTask()} and
1988       *             {@link #getMemoryForReduceTask()}
1989       */
1990      @Deprecated
1991      public long getMaxVirtualMemoryForTask() {
1992        LOG.warn(
1993          "getMaxVirtualMemoryForTask() is deprecated. " +
1994          "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
1995    
1996        long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
1997        value = normalizeMemoryConfigValue(value);
1998        if (value == DISABLED_MEMORY_LIMIT) {
1999          value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
2000          value = normalizeMemoryConfigValue(value);
2001          if (value != DISABLED_MEMORY_LIMIT) {
2002            value *= 1024*1024;
2003          }
2004        }
2005        return value;
2006      }
2007    
2008      /**
2009       * Set the maximum amount of memory any task of this job can use. See
2010       * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
2011       * <p/>
2012       * mapred.task.maxvmem is split into
2013       * mapreduce.map.memory.mb
2014       * and mapreduce.map.memory.mb,mapred
2015       * each of the new key are set
2016       * as mapred.task.maxvmem / 1024
2017       * as new values are in MB
2018       *
2019       * @param vmem Maximum amount of virtual memory in bytes any task of this job
2020       *             can use.
2021       * @see #getMaxVirtualMemoryForTask()
2022       * @deprecated
2023       *  Use {@link #setMemoryForMapTask(long mem)}  and
2024       *  Use {@link #setMemoryForReduceTask(long mem)}
2025       */
2026      @Deprecated
2027      public void setMaxVirtualMemoryForTask(long vmem) {
2028        LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
2029          "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
2030        if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) {
2031          setMemoryForMapTask(DISABLED_MEMORY_LIMIT);
2032          setMemoryForReduceTask(DISABLED_MEMORY_LIMIT);
2033        }
2034    
2035        if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
2036          setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
2037          setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
2038        }else{
2039          this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
2040        }
2041      }
2042    
2043      /**
2044       * @deprecated this variable is deprecated and nolonger in use.
2045       */
2046      @Deprecated
2047      public long getMaxPhysicalMemoryForTask() {
2048        LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
2049                  + " Refer to the APIs getMemoryForMapTask() and"
2050                  + " getMemoryForReduceTask() for details.");
2051        return -1;
2052      }
2053    
2054      /*
2055       * @deprecated this
2056       */
2057      @Deprecated
2058      public void setMaxPhysicalMemoryForTask(long mem) {
2059        LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
2060            + " The value set is ignored. Refer to "
2061            + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
2062      }
2063    
2064      static String deprecatedString(String key) {
2065        return "The variable " + key + " is no longer used.";
2066      }
2067    
2068      private void checkAndWarnDeprecation() {
2069        if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
2070          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
2071                    + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY
2072                    + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY);
2073        }
2074        if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
2075          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
2076        }
2077        if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
2078          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
2079        }
2080        if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
2081          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
2082        }
2083      }
2084      
2085    
2086    }
2087