001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021
022 import java.io.IOException;
023 import java.util.regex.Pattern;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceAudience.Private;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.fs.FileStatus;
032 import org.apache.hadoop.fs.FileSystem;
033 import org.apache.hadoop.fs.Path;
034 import org.apache.hadoop.io.LongWritable;
035 import org.apache.hadoop.io.RawComparator;
036 import org.apache.hadoop.io.Text;
037 import org.apache.hadoop.io.WritableComparable;
038 import org.apache.hadoop.io.WritableComparator;
039 import org.apache.hadoop.io.compress.CompressionCodec;
040 import org.apache.hadoop.mapred.lib.HashPartitioner;
041 import org.apache.hadoop.mapred.lib.IdentityMapper;
042 import org.apache.hadoop.mapred.lib.IdentityReducer;
043 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
044 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
045 import org.apache.hadoop.mapreduce.MRConfig;
046 import org.apache.hadoop.mapreduce.MRJobConfig;
047 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
048 import org.apache.hadoop.mapreduce.util.ConfigUtil;
049 import org.apache.hadoop.security.Credentials;
050 import org.apache.hadoop.util.ClassUtil;
051 import org.apache.hadoop.util.ReflectionUtils;
052 import org.apache.hadoop.util.Tool;
053 import org.apache.log4j.Level;
054
055 /**
056 * A map/reduce job configuration.
057 *
058 * <p><code>JobConf</code> is the primary interface for a user to describe a
059 * map-reduce job to the Hadoop framework for execution. The framework tries to
060 * faithfully execute the job as-is described by <code>JobConf</code>, however:
061 * <ol>
062 * <li>
063 * Some configuration parameters might have been marked as
064 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
065 * final</a> by administrators and hence cannot be altered.
066 * </li>
067 * <li>
068 * While some job parameters are straight-forward to set
069 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly
070 * rest of the framework and/or job-configuration and is relatively more
071 * complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}).
072 * </li>
073 * </ol></p>
074 *
075 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner
076 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and
077 * {@link OutputFormat} implementations to be used etc.
078 *
079 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets
080 * of the job such as <code>Comparator</code>s to be used, files to be put in
081 * the {@link DistributedCache}, whether or not intermediate and/or job outputs
082 * are to be compressed (and how), debugability via user-provided scripts
083 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
084 * for doing post-processing on task logs, task's stdout, stderr, syslog.
085 * and etc.</p>
086 *
087 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
088 * <p><blockquote><pre>
089 * // Create a new JobConf
090 * JobConf job = new JobConf(new Configuration(), MyJob.class);
091 *
092 * // Specify various job-specific parameters
093 * job.setJobName("myjob");
094 *
095 * FileInputFormat.setInputPaths(job, new Path("in"));
096 * FileOutputFormat.setOutputPath(job, new Path("out"));
097 *
098 * job.setMapperClass(MyJob.MyMapper.class);
099 * job.setCombinerClass(MyJob.MyReducer.class);
100 * job.setReducerClass(MyJob.MyReducer.class);
101 *
102 * job.setInputFormat(SequenceFileInputFormat.class);
103 * job.setOutputFormat(SequenceFileOutputFormat.class);
104 * </pre></blockquote></p>
105 *
106 * @see JobClient
107 * @see ClusterStatus
108 * @see Tool
109 * @see DistributedCache
110 */
111 @InterfaceAudience.Public
112 @InterfaceStability.Stable
113 public class JobConf extends Configuration {
114
115 private static final Log LOG = LogFactory.getLog(JobConf.class);
116
117 static{
118 ConfigUtil.loadResources();
119 }
120
121 /**
122 * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and
123 * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
124 */
125 @Deprecated
126 public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
127 "mapred.task.maxvmem";
128
129 /**
130 * @deprecated
131 */
132 @Deprecated
133 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
134 "mapred.task.limit.maxvmem";
135
136 /**
137 * @deprecated
138 */
139 @Deprecated
140 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
141 "mapred.task.default.maxvmem";
142
143 /**
144 * @deprecated
145 */
146 @Deprecated
147 public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
148 "mapred.task.maxpmem";
149
150 /**
151 * A value which if set for memory related configuration options,
152 * indicates that the options are turned off.
153 */
154 public static final long DISABLED_MEMORY_LIMIT = -1L;
155
156 /**
157 * Property name for the configuration property mapreduce.cluster.local.dir
158 */
159 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
160
161 /**
162 * Name of the queue to which jobs will be submitted, if no queue
163 * name is mentioned.
164 */
165 public static final String DEFAULT_QUEUE_NAME = "default";
166
167 static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY =
168 JobContext.MAP_MEMORY_MB;
169
170 static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY =
171 JobContext.REDUCE_MEMORY_MB;
172
173 /**
174 * The variable is kept for M/R 1.x applications, while M/R 2.x applications
175 * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY}
176 */
177 @Deprecated
178 public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
179 "mapred.job.map.memory.mb";
180
181 /**
182 * The variable is kept for M/R 1.x applications, while M/R 2.x applications
183 * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
184 */
185 @Deprecated
186 public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
187 "mapred.job.reduce.memory.mb";
188
189 /** Pattern for the default unpacking behavior for job jars */
190 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
191 Pattern.compile("(?:classes/|lib/).*");
192
193 /**
194 * Configuration key to set the java command line options for the child
195 * map and reduce tasks.
196 *
197 * Java opts for the task tracker child processes.
198 * The following symbol, if present, will be interpolated: @taskid@.
199 * It is replaced by current TaskID. Any other occurrences of '@' will go
200 * unchanged.
201 * For example, to enable verbose gc logging to a file named for the taskid in
202 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
203 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
204 *
205 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass
206 * other environment variables to the child processes.
207 *
208 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or
209 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
210 */
211 @Deprecated
212 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
213
214 /**
215 * Configuration key to set the java command line options for the map tasks.
216 *
217 * Java opts for the task tracker child map processes.
218 * The following symbol, if present, will be interpolated: @taskid@.
219 * It is replaced by current TaskID. Any other occurrences of '@' will go
220 * unchanged.
221 * For example, to enable verbose gc logging to a file named for the taskid in
222 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
223 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
224 *
225 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass
226 * other environment variables to the map processes.
227 */
228 public static final String MAPRED_MAP_TASK_JAVA_OPTS =
229 JobContext.MAP_JAVA_OPTS;
230
231 /**
232 * Configuration key to set the java command line options for the reduce tasks.
233 *
234 * Java opts for the task tracker child reduce processes.
235 * The following symbol, if present, will be interpolated: @taskid@.
236 * It is replaced by current TaskID. Any other occurrences of '@' will go
237 * unchanged.
238 * For example, to enable verbose gc logging to a file named for the taskid in
239 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
240 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
241 *
242 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to
243 * pass process environment variables to the reduce processes.
244 */
245 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS =
246 JobContext.REDUCE_JAVA_OPTS;
247
248 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
249
250 /**
251 * @deprecated
252 * Configuration key to set the maximum virtual memory available to the child
253 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
254 * longer have any effect.
255 */
256 @Deprecated
257 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
258
259 /**
260 * @deprecated
261 * Configuration key to set the maximum virtual memory available to the
262 * map tasks (in kilo-bytes). This has been deprecated and will no
263 * longer have any effect.
264 */
265 @Deprecated
266 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
267
268 /**
269 * @deprecated
270 * Configuration key to set the maximum virtual memory available to the
271 * reduce tasks (in kilo-bytes). This has been deprecated and will no
272 * longer have any effect.
273 */
274 @Deprecated
275 public static final String MAPRED_REDUCE_TASK_ULIMIT =
276 "mapreduce.reduce.ulimit";
277
278
279 /**
280 * Configuration key to set the environment of the child map/reduce tasks.
281 *
282 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
283 * reference existing environment variables via <code>$key</code> on
284 * Linux or <code>%key%</code> on Windows.
285 *
286 * Example:
287 * <ul>
288 * <li> A=foo - This will set the env variable A to foo. </li>
289 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
290 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
291 * </ul>
292 *
293 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or
294 * {@link #MAPRED_REDUCE_TASK_ENV}
295 */
296 @Deprecated
297 public static final String MAPRED_TASK_ENV = "mapred.child.env";
298
299 /**
300 * Configuration key to set the environment of the child map tasks.
301 *
302 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
303 * reference existing environment variables via <code>$key</code> on
304 * Linux or <code>%key%</code> on Windows.
305 *
306 * Example:
307 * <ul>
308 * <li> A=foo - This will set the env variable A to foo. </li>
309 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
310 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
311 * </ul>
312 */
313 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
314
315 /**
316 * Configuration key to set the environment of the child reduce tasks.
317 *
318 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
319 * reference existing environment variables via <code>$key</code> on
320 * Linux or <code>%key%</code> on Windows.
321 *
322 * Example:
323 * <ul>
324 * <li> A=foo - This will set the env variable A to foo. </li>
325 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
326 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
327 * </ul>
328 */
329 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
330
331 private Credentials credentials = new Credentials();
332
333 /**
334 * Configuration key to set the logging {@link Level} for the map task.
335 *
336 * The allowed logging levels are:
337 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
338 */
339 public static final String MAPRED_MAP_TASK_LOG_LEVEL =
340 JobContext.MAP_LOG_LEVEL;
341
342 /**
343 * Configuration key to set the logging {@link Level} for the reduce task.
344 *
345 * The allowed logging levels are:
346 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
347 */
348 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL =
349 JobContext.REDUCE_LOG_LEVEL;
350
351 /**
352 * Default logging level for map/reduce tasks.
353 */
354 public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
355
356 /**
357 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
358 * use {@link MRJobConfig#WORKFLOW_ID} instead
359 */
360 @Deprecated
361 public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID;
362
363 /**
364 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
365 * use {@link MRJobConfig#WORKFLOW_NAME} instead
366 */
367 @Deprecated
368 public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME;
369
370 /**
371 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
372 * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead
373 */
374 @Deprecated
375 public static final String WORKFLOW_NODE_NAME =
376 MRJobConfig.WORKFLOW_NODE_NAME;
377
378 /**
379 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
380 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead
381 */
382 @Deprecated
383 public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
384 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING;
385
386 /**
387 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
388 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead
389 */
390 @Deprecated
391 public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
392 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN;
393
394 /**
395 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
396 * use {@link MRJobConfig#WORKFLOW_TAGS} instead
397 */
398 @Deprecated
399 public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS;
400
401 /**
402 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
403 * not use it
404 */
405 @Deprecated
406 public static final String MAPREDUCE_RECOVER_JOB =
407 "mapreduce.job.restart.recover";
408
409 /**
410 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
411 * not use it
412 */
413 @Deprecated
414 public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true;
415
416 /**
417 * Construct a map/reduce job configuration.
418 */
419 public JobConf() {
420 checkAndWarnDeprecation();
421 }
422
423 /**
424 * Construct a map/reduce job configuration.
425 *
426 * @param exampleClass a class whose containing jar is used as the job's jar.
427 */
428 public JobConf(Class exampleClass) {
429 setJarByClass(exampleClass);
430 checkAndWarnDeprecation();
431 }
432
433 /**
434 * Construct a map/reduce job configuration.
435 *
436 * @param conf a Configuration whose settings will be inherited.
437 */
438 public JobConf(Configuration conf) {
439 super(conf);
440
441 if (conf instanceof JobConf) {
442 JobConf that = (JobConf)conf;
443 credentials = that.credentials;
444 }
445
446 checkAndWarnDeprecation();
447 }
448
449
450 /** Construct a map/reduce job configuration.
451 *
452 * @param conf a Configuration whose settings will be inherited.
453 * @param exampleClass a class whose containing jar is used as the job's jar.
454 */
455 public JobConf(Configuration conf, Class exampleClass) {
456 this(conf);
457 setJarByClass(exampleClass);
458 }
459
460
461 /** Construct a map/reduce configuration.
462 *
463 * @param config a Configuration-format XML job description file.
464 */
465 public JobConf(String config) {
466 this(new Path(config));
467 }
468
469 /** Construct a map/reduce configuration.
470 *
471 * @param config a Configuration-format XML job description file.
472 */
473 public JobConf(Path config) {
474 super();
475 addResource(config);
476 checkAndWarnDeprecation();
477 }
478
479 /** A new map/reduce configuration where the behavior of reading from the
480 * default resources can be turned off.
481 * <p/>
482 * If the parameter {@code loadDefaults} is false, the new instance
483 * will not load resources from the default files.
484 *
485 * @param loadDefaults specifies whether to load from the default files
486 */
487 public JobConf(boolean loadDefaults) {
488 super(loadDefaults);
489 checkAndWarnDeprecation();
490 }
491
492 /**
493 * Get credentials for the job.
494 * @return credentials for the job
495 */
496 public Credentials getCredentials() {
497 return credentials;
498 }
499
500 @Private
501 public void setCredentials(Credentials credentials) {
502 this.credentials = credentials;
503 }
504
505 /**
506 * Get the user jar for the map-reduce job.
507 *
508 * @return the user jar for the map-reduce job.
509 */
510 public String getJar() { return get(JobContext.JAR); }
511
512 /**
513 * Set the user jar for the map-reduce job.
514 *
515 * @param jar the user jar for the map-reduce job.
516 */
517 public void setJar(String jar) { set(JobContext.JAR, jar); }
518
519 /**
520 * Get the pattern for jar contents to unpack on the tasktracker
521 */
522 public Pattern getJarUnpackPattern() {
523 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
524 }
525
526
527 /**
528 * Set the job's jar file by finding an example class location.
529 *
530 * @param cls the example class.
531 */
532 public void setJarByClass(Class cls) {
533 String jar = ClassUtil.findContainingJar(cls);
534 if (jar != null) {
535 setJar(jar);
536 }
537 }
538
539 public String[] getLocalDirs() throws IOException {
540 return getTrimmedStrings(MRConfig.LOCAL_DIR);
541 }
542
543 /**
544 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
545 */
546 @Deprecated
547 public void deleteLocalFiles() throws IOException {
548 String[] localDirs = getLocalDirs();
549 for (int i = 0; i < localDirs.length; i++) {
550 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
551 }
552 }
553
554 public void deleteLocalFiles(String subdir) throws IOException {
555 String[] localDirs = getLocalDirs();
556 for (int i = 0; i < localDirs.length; i++) {
557 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
558 }
559 }
560
561 /**
562 * Constructs a local file name. Files are distributed among configured
563 * local directories.
564 */
565 public Path getLocalPath(String pathString) throws IOException {
566 return getLocalPath(MRConfig.LOCAL_DIR, pathString);
567 }
568
569 /**
570 * Get the reported username for this job.
571 *
572 * @return the username
573 */
574 public String getUser() {
575 return get(JobContext.USER_NAME);
576 }
577
578 /**
579 * Set the reported username for this job.
580 *
581 * @param user the username for this job.
582 */
583 public void setUser(String user) {
584 set(JobContext.USER_NAME, user);
585 }
586
587
588
589 /**
590 * Set whether the framework should keep the intermediate files for
591 * failed tasks.
592 *
593 * @param keep <code>true</code> if framework should keep the intermediate files
594 * for failed tasks, <code>false</code> otherwise.
595 *
596 */
597 public void setKeepFailedTaskFiles(boolean keep) {
598 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
599 }
600
601 /**
602 * Should the temporary files for failed tasks be kept?
603 *
604 * @return should the files be kept?
605 */
606 public boolean getKeepFailedTaskFiles() {
607 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
608 }
609
610 /**
611 * Set a regular expression for task names that should be kept.
612 * The regular expression ".*_m_000123_0" would keep the files
613 * for the first instance of map 123 that ran.
614 *
615 * @param pattern the java.util.regex.Pattern to match against the
616 * task names.
617 */
618 public void setKeepTaskFilesPattern(String pattern) {
619 set(JobContext.PRESERVE_FILES_PATTERN, pattern);
620 }
621
622 /**
623 * Get the regular expression that is matched against the task names
624 * to see if we need to keep the files.
625 *
626 * @return the pattern as a string, if it was set, othewise null.
627 */
628 public String getKeepTaskFilesPattern() {
629 return get(JobContext.PRESERVE_FILES_PATTERN);
630 }
631
632 /**
633 * Set the current working directory for the default file system.
634 *
635 * @param dir the new current working directory.
636 */
637 public void setWorkingDirectory(Path dir) {
638 dir = new Path(getWorkingDirectory(), dir);
639 set(JobContext.WORKING_DIR, dir.toString());
640 }
641
642 /**
643 * Get the current working directory for the default file system.
644 *
645 * @return the directory name.
646 */
647 public Path getWorkingDirectory() {
648 String name = get(JobContext.WORKING_DIR);
649 if (name != null) {
650 return new Path(name);
651 } else {
652 try {
653 Path dir = FileSystem.get(this).getWorkingDirectory();
654 set(JobContext.WORKING_DIR, dir.toString());
655 return dir;
656 } catch (IOException e) {
657 throw new RuntimeException(e);
658 }
659 }
660 }
661
662 /**
663 * Sets the number of tasks that a spawned task JVM should run
664 * before it exits
665 * @param numTasks the number of tasks to execute; defaults to 1;
666 * -1 signifies no limit
667 */
668 public void setNumTasksToExecutePerJvm(int numTasks) {
669 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
670 }
671
672 /**
673 * Get the number of tasks that a spawned JVM should execute
674 */
675 public int getNumTasksToExecutePerJvm() {
676 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
677 }
678
679 /**
680 * Get the {@link InputFormat} implementation for the map-reduce job,
681 * defaults to {@link TextInputFormat} if not specified explicity.
682 *
683 * @return the {@link InputFormat} implementation for the map-reduce job.
684 */
685 public InputFormat getInputFormat() {
686 return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
687 TextInputFormat.class,
688 InputFormat.class),
689 this);
690 }
691
692 /**
693 * Set the {@link InputFormat} implementation for the map-reduce job.
694 *
695 * @param theClass the {@link InputFormat} implementation for the map-reduce
696 * job.
697 */
698 public void setInputFormat(Class<? extends InputFormat> theClass) {
699 setClass("mapred.input.format.class", theClass, InputFormat.class);
700 }
701
702 /**
703 * Get the {@link OutputFormat} implementation for the map-reduce job,
704 * defaults to {@link TextOutputFormat} if not specified explicity.
705 *
706 * @return the {@link OutputFormat} implementation for the map-reduce job.
707 */
708 public OutputFormat getOutputFormat() {
709 return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
710 TextOutputFormat.class,
711 OutputFormat.class),
712 this);
713 }
714
715 /**
716 * Get the {@link OutputCommitter} implementation for the map-reduce job,
717 * defaults to {@link FileOutputCommitter} if not specified explicitly.
718 *
719 * @return the {@link OutputCommitter} implementation for the map-reduce job.
720 */
721 public OutputCommitter getOutputCommitter() {
722 return (OutputCommitter)ReflectionUtils.newInstance(
723 getClass("mapred.output.committer.class", FileOutputCommitter.class,
724 OutputCommitter.class), this);
725 }
726
727 /**
728 * Set the {@link OutputCommitter} implementation for the map-reduce job.
729 *
730 * @param theClass the {@link OutputCommitter} implementation for the map-reduce
731 * job.
732 */
733 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
734 setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
735 }
736
737 /**
738 * Set the {@link OutputFormat} implementation for the map-reduce job.
739 *
740 * @param theClass the {@link OutputFormat} implementation for the map-reduce
741 * job.
742 */
743 public void setOutputFormat(Class<? extends OutputFormat> theClass) {
744 setClass("mapred.output.format.class", theClass, OutputFormat.class);
745 }
746
747 /**
748 * Should the map outputs be compressed before transfer?
749 *
750 * @param compress should the map outputs be compressed?
751 */
752 public void setCompressMapOutput(boolean compress) {
753 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
754 }
755
756 /**
757 * Are the outputs of the maps be compressed?
758 *
759 * @return <code>true</code> if the outputs of the maps are to be compressed,
760 * <code>false</code> otherwise.
761 */
762 public boolean getCompressMapOutput() {
763 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
764 }
765
766 /**
767 * Set the given class as the {@link CompressionCodec} for the map outputs.
768 *
769 * @param codecClass the {@link CompressionCodec} class that will compress
770 * the map outputs.
771 */
772 public void
773 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
774 setCompressMapOutput(true);
775 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass,
776 CompressionCodec.class);
777 }
778
779 /**
780 * Get the {@link CompressionCodec} for compressing the map outputs.
781 *
782 * @param defaultValue the {@link CompressionCodec} to return if not set
783 * @return the {@link CompressionCodec} class that should be used to compress the
784 * map outputs.
785 * @throws IllegalArgumentException if the class was specified, but not found
786 */
787 public Class<? extends CompressionCodec>
788 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
789 Class<? extends CompressionCodec> codecClass = defaultValue;
790 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
791 if (name != null) {
792 try {
793 codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
794 } catch (ClassNotFoundException e) {
795 throw new IllegalArgumentException("Compression codec " + name +
796 " was not found.", e);
797 }
798 }
799 return codecClass;
800 }
801
802 /**
803 * Get the key class for the map output data. If it is not set, use the
804 * (final) output key class. This allows the map output key class to be
805 * different than the final output key class.
806 *
807 * @return the map output key class.
808 */
809 public Class<?> getMapOutputKeyClass() {
810 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
811 if (retv == null) {
812 retv = getOutputKeyClass();
813 }
814 return retv;
815 }
816
817 /**
818 * Set the key class for the map output data. This allows the user to
819 * specify the map output key class to be different than the final output
820 * value class.
821 *
822 * @param theClass the map output key class.
823 */
824 public void setMapOutputKeyClass(Class<?> theClass) {
825 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
826 }
827
828 /**
829 * Get the value class for the map output data. If it is not set, use the
830 * (final) output value class This allows the map output value class to be
831 * different than the final output value class.
832 *
833 * @return the map output value class.
834 */
835 public Class<?> getMapOutputValueClass() {
836 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
837 Object.class);
838 if (retv == null) {
839 retv = getOutputValueClass();
840 }
841 return retv;
842 }
843
844 /**
845 * Set the value class for the map output data. This allows the user to
846 * specify the map output value class to be different than the final output
847 * value class.
848 *
849 * @param theClass the map output value class.
850 */
851 public void setMapOutputValueClass(Class<?> theClass) {
852 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
853 }
854
855 /**
856 * Get the key class for the job output data.
857 *
858 * @return the key class for the job output data.
859 */
860 public Class<?> getOutputKeyClass() {
861 return getClass(JobContext.OUTPUT_KEY_CLASS,
862 LongWritable.class, Object.class);
863 }
864
865 /**
866 * Set the key class for the job output data.
867 *
868 * @param theClass the key class for the job output data.
869 */
870 public void setOutputKeyClass(Class<?> theClass) {
871 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
872 }
873
874 /**
875 * Get the {@link RawComparator} comparator used to compare keys.
876 *
877 * @return the {@link RawComparator} comparator used to compare keys.
878 */
879 public RawComparator getOutputKeyComparator() {
880 Class<? extends RawComparator> theClass = getClass(
881 JobContext.KEY_COMPARATOR, null, RawComparator.class);
882 if (theClass != null)
883 return ReflectionUtils.newInstance(theClass, this);
884 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
885 }
886
887 /**
888 * Set the {@link RawComparator} comparator used to compare keys.
889 *
890 * @param theClass the {@link RawComparator} comparator used to
891 * compare keys.
892 * @see #setOutputValueGroupingComparator(Class)
893 */
894 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
895 setClass(JobContext.KEY_COMPARATOR,
896 theClass, RawComparator.class);
897 }
898
899 /**
900 * Set the {@link KeyFieldBasedComparator} options used to compare keys.
901 *
902 * @param keySpec the key specification of the form -k pos1[,pos2], where,
903 * pos is of the form f[.c][opts], where f is the number
904 * of the key field to use, and c is the number of the first character from
905 * the beginning of the field. Fields and character posns are numbered
906 * starting with 1; a character position of zero in pos2 indicates the
907 * field's last character. If '.c' is omitted from pos1, it defaults to 1
908 * (the beginning of the field); if omitted from pos2, it defaults to 0
909 * (the end of the field). opts are ordering options. The supported options
910 * are:
911 * -n, (Sort numerically)
912 * -r, (Reverse the result of comparison)
913 */
914 public void setKeyFieldComparatorOptions(String keySpec) {
915 setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
916 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
917 }
918
919 /**
920 * Get the {@link KeyFieldBasedComparator} options
921 */
922 public String getKeyFieldComparatorOption() {
923 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
924 }
925
926 /**
927 * Set the {@link KeyFieldBasedPartitioner} options used for
928 * {@link Partitioner}
929 *
930 * @param keySpec the key specification of the form -k pos1[,pos2], where,
931 * pos is of the form f[.c][opts], where f is the number
932 * of the key field to use, and c is the number of the first character from
933 * the beginning of the field. Fields and character posns are numbered
934 * starting with 1; a character position of zero in pos2 indicates the
935 * field's last character. If '.c' is omitted from pos1, it defaults to 1
936 * (the beginning of the field); if omitted from pos2, it defaults to 0
937 * (the end of the field).
938 */
939 public void setKeyFieldPartitionerOptions(String keySpec) {
940 setPartitionerClass(KeyFieldBasedPartitioner.class);
941 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
942 }
943
944 /**
945 * Get the {@link KeyFieldBasedPartitioner} options
946 */
947 public String getKeyFieldPartitionerOption() {
948 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
949 }
950
951 /**
952 * Get the user defined {@link WritableComparable} comparator for
953 * grouping keys of inputs to the combiner.
954 *
955 * @return comparator set by the user for grouping values.
956 * @see #setCombinerKeyGroupingComparator(Class) for details.
957 */
958 public RawComparator getCombinerKeyGroupingComparator() {
959 Class<? extends RawComparator> theClass = getClass(
960 JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
961 if (theClass == null) {
962 return getOutputKeyComparator();
963 }
964
965 return ReflectionUtils.newInstance(theClass, this);
966 }
967
968 /**
969 * Get the user defined {@link WritableComparable} comparator for
970 * grouping keys of inputs to the reduce.
971 *
972 * @return comparator set by the user for grouping values.
973 * @see #setOutputValueGroupingComparator(Class) for details.
974 */
975 public RawComparator getOutputValueGroupingComparator() {
976 Class<? extends RawComparator> theClass = getClass(
977 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
978 if (theClass == null) {
979 return getOutputKeyComparator();
980 }
981
982 return ReflectionUtils.newInstance(theClass, this);
983 }
984
985 /**
986 * Set the user defined {@link RawComparator} comparator for
987 * grouping keys in the input to the combiner.
988 * <p/>
989 * <p>This comparator should be provided if the equivalence rules for keys
990 * for sorting the intermediates are different from those for grouping keys
991 * before each call to
992 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
993 * <p/>
994 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
995 * in a single call to the reduce function if K1 and K2 compare as equal.</p>
996 * <p/>
997 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
998 * how keys are sorted, this can be used in conjunction to simulate
999 * <i>secondary sort on values</i>.</p>
1000 * <p/>
1001 * <p><i>Note</i>: This is not a guarantee of the combiner sort being
1002 * <i>stable</i> in any sense. (In any case, with the order of available
1003 * map-outputs to the combiner being non-deterministic, it wouldn't make
1004 * that much sense.)</p>
1005 *
1006 * @param theClass the comparator class to be used for grouping keys for the
1007 * combiner. It should implement <code>RawComparator</code>.
1008 * @see #setOutputKeyComparatorClass(Class)
1009 */
1010 public void setCombinerKeyGroupingComparator(
1011 Class<? extends RawComparator> theClass) {
1012 setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
1013 theClass, RawComparator.class);
1014 }
1015
1016 /**
1017 * Set the user defined {@link RawComparator} comparator for
1018 * grouping keys in the input to the reduce.
1019 *
1020 * <p>This comparator should be provided if the equivalence rules for keys
1021 * for sorting the intermediates are different from those for grouping keys
1022 * before each call to
1023 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
1024 *
1025 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
1026 * in a single call to the reduce function if K1 and K2 compare as equal.</p>
1027 *
1028 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
1029 * how keys are sorted, this can be used in conjunction to simulate
1030 * <i>secondary sort on values</i>.</p>
1031 *
1032 * <p><i>Note</i>: This is not a guarantee of the reduce sort being
1033 * <i>stable</i> in any sense. (In any case, with the order of available
1034 * map-outputs to the reduce being non-deterministic, it wouldn't make
1035 * that much sense.)</p>
1036 *
1037 * @param theClass the comparator class to be used for grouping keys.
1038 * It should implement <code>RawComparator</code>.
1039 * @see #setOutputKeyComparatorClass(Class)
1040 * @see #setCombinerKeyGroupingComparator(Class)
1041 */
1042 public void setOutputValueGroupingComparator(
1043 Class<? extends RawComparator> theClass) {
1044 setClass(JobContext.GROUP_COMPARATOR_CLASS,
1045 theClass, RawComparator.class);
1046 }
1047
1048 /**
1049 * Should the framework use the new context-object code for running
1050 * the mapper?
1051 * @return true, if the new api should be used
1052 */
1053 public boolean getUseNewMapper() {
1054 return getBoolean("mapred.mapper.new-api", false);
1055 }
1056 /**
1057 * Set whether the framework should use the new api for the mapper.
1058 * This is the default for jobs submitted with the new Job api.
1059 * @param flag true, if the new api should be used
1060 */
1061 public void setUseNewMapper(boolean flag) {
1062 setBoolean("mapred.mapper.new-api", flag);
1063 }
1064
1065 /**
1066 * Should the framework use the new context-object code for running
1067 * the reducer?
1068 * @return true, if the new api should be used
1069 */
1070 public boolean getUseNewReducer() {
1071 return getBoolean("mapred.reducer.new-api", false);
1072 }
1073 /**
1074 * Set whether the framework should use the new api for the reducer.
1075 * This is the default for jobs submitted with the new Job api.
1076 * @param flag true, if the new api should be used
1077 */
1078 public void setUseNewReducer(boolean flag) {
1079 setBoolean("mapred.reducer.new-api", flag);
1080 }
1081
1082 /**
1083 * Get the value class for job outputs.
1084 *
1085 * @return the value class for job outputs.
1086 */
1087 public Class<?> getOutputValueClass() {
1088 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
1089 }
1090
1091 /**
1092 * Set the value class for job outputs.
1093 *
1094 * @param theClass the value class for job outputs.
1095 */
1096 public void setOutputValueClass(Class<?> theClass) {
1097 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
1098 }
1099
1100 /**
1101 * Get the {@link Mapper} class for the job.
1102 *
1103 * @return the {@link Mapper} class for the job.
1104 */
1105 public Class<? extends Mapper> getMapperClass() {
1106 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
1107 }
1108
1109 /**
1110 * Set the {@link Mapper} class for the job.
1111 *
1112 * @param theClass the {@link Mapper} class for the job.
1113 */
1114 public void setMapperClass(Class<? extends Mapper> theClass) {
1115 setClass("mapred.mapper.class", theClass, Mapper.class);
1116 }
1117
1118 /**
1119 * Get the {@link MapRunnable} class for the job.
1120 *
1121 * @return the {@link MapRunnable} class for the job.
1122 */
1123 public Class<? extends MapRunnable> getMapRunnerClass() {
1124 return getClass("mapred.map.runner.class",
1125 MapRunner.class, MapRunnable.class);
1126 }
1127
1128 /**
1129 * Expert: Set the {@link MapRunnable} class for the job.
1130 *
1131 * Typically used to exert greater control on {@link Mapper}s.
1132 *
1133 * @param theClass the {@link MapRunnable} class for the job.
1134 */
1135 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1136 setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1137 }
1138
1139 /**
1140 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs
1141 * to be sent to the {@link Reducer}s.
1142 *
1143 * @return the {@link Partitioner} used to partition map-outputs.
1144 */
1145 public Class<? extends Partitioner> getPartitionerClass() {
1146 return getClass("mapred.partitioner.class",
1147 HashPartitioner.class, Partitioner.class);
1148 }
1149
1150 /**
1151 * Set the {@link Partitioner} class used to partition
1152 * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1153 *
1154 * @param theClass the {@link Partitioner} used to partition map-outputs.
1155 */
1156 public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1157 setClass("mapred.partitioner.class", theClass, Partitioner.class);
1158 }
1159
1160 /**
1161 * Get the {@link Reducer} class for the job.
1162 *
1163 * @return the {@link Reducer} class for the job.
1164 */
1165 public Class<? extends Reducer> getReducerClass() {
1166 return getClass("mapred.reducer.class",
1167 IdentityReducer.class, Reducer.class);
1168 }
1169
1170 /**
1171 * Set the {@link Reducer} class for the job.
1172 *
1173 * @param theClass the {@link Reducer} class for the job.
1174 */
1175 public void setReducerClass(Class<? extends Reducer> theClass) {
1176 setClass("mapred.reducer.class", theClass, Reducer.class);
1177 }
1178
1179 /**
1180 * Get the user-defined <i>combiner</i> class used to combine map-outputs
1181 * before being sent to the reducers. Typically the combiner is same as the
1182 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1183 *
1184 * @return the user-defined combiner class used to combine map-outputs.
1185 */
1186 public Class<? extends Reducer> getCombinerClass() {
1187 return getClass("mapred.combiner.class", null, Reducer.class);
1188 }
1189
1190 /**
1191 * Set the user-defined <i>combiner</i> class used to combine map-outputs
1192 * before being sent to the reducers.
1193 *
1194 * <p>The combiner is an application-specified aggregation operation, which
1195 * can help cut down the amount of data transferred between the
1196 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1197 *
1198 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1199 * the mapper and reducer tasks. In general, the combiner is called as the
1200 * sort/merge result is written to disk. The combiner must:
1201 * <ul>
1202 * <li> be side-effect free</li>
1203 * <li> have the same input and output key types and the same input and
1204 * output value types</li>
1205 * </ul></p>
1206 *
1207 * <p>Typically the combiner is same as the <code>Reducer</code> for the
1208 * job i.e. {@link #setReducerClass(Class)}.</p>
1209 *
1210 * @param theClass the user-defined combiner class used to combine
1211 * map-outputs.
1212 */
1213 public void setCombinerClass(Class<? extends Reducer> theClass) {
1214 setClass("mapred.combiner.class", theClass, Reducer.class);
1215 }
1216
1217 /**
1218 * Should speculative execution be used for this job?
1219 * Defaults to <code>true</code>.
1220 *
1221 * @return <code>true</code> if speculative execution be used for this job,
1222 * <code>false</code> otherwise.
1223 */
1224 public boolean getSpeculativeExecution() {
1225 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1226 }
1227
1228 /**
1229 * Turn speculative execution on or off for this job.
1230 *
1231 * @param speculativeExecution <code>true</code> if speculative execution
1232 * should be turned on, else <code>false</code>.
1233 */
1234 public void setSpeculativeExecution(boolean speculativeExecution) {
1235 setMapSpeculativeExecution(speculativeExecution);
1236 setReduceSpeculativeExecution(speculativeExecution);
1237 }
1238
1239 /**
1240 * Should speculative execution be used for this job for map tasks?
1241 * Defaults to <code>true</code>.
1242 *
1243 * @return <code>true</code> if speculative execution be
1244 * used for this job for map tasks,
1245 * <code>false</code> otherwise.
1246 */
1247 public boolean getMapSpeculativeExecution() {
1248 return getBoolean(JobContext.MAP_SPECULATIVE, true);
1249 }
1250
1251 /**
1252 * Turn speculative execution on or off for this job for map tasks.
1253 *
1254 * @param speculativeExecution <code>true</code> if speculative execution
1255 * should be turned on for map tasks,
1256 * else <code>false</code>.
1257 */
1258 public void setMapSpeculativeExecution(boolean speculativeExecution) {
1259 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1260 }
1261
1262 /**
1263 * Should speculative execution be used for this job for reduce tasks?
1264 * Defaults to <code>true</code>.
1265 *
1266 * @return <code>true</code> if speculative execution be used
1267 * for reduce tasks for this job,
1268 * <code>false</code> otherwise.
1269 */
1270 public boolean getReduceSpeculativeExecution() {
1271 return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1272 }
1273
1274 /**
1275 * Turn speculative execution on or off for this job for reduce tasks.
1276 *
1277 * @param speculativeExecution <code>true</code> if speculative execution
1278 * should be turned on for reduce tasks,
1279 * else <code>false</code>.
1280 */
1281 public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1282 setBoolean(JobContext.REDUCE_SPECULATIVE,
1283 speculativeExecution);
1284 }
1285
1286 /**
1287 * Get configured the number of reduce tasks for this job.
1288 * Defaults to <code>1</code>.
1289 *
1290 * @return the number of reduce tasks for this job.
1291 */
1292 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1293
1294 /**
1295 * Set the number of map tasks for this job.
1296 *
1297 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual
1298 * number of spawned map tasks depends on the number of {@link InputSplit}s
1299 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1300 *
1301 * A custom {@link InputFormat} is typically used to accurately control
1302 * the number of map tasks for the job.</p>
1303 *
1304 * <h4 id="NoOfMaps">How many maps?</h4>
1305 *
1306 * <p>The number of maps is usually driven by the total size of the inputs
1307 * i.e. total number of blocks of the input files.</p>
1308 *
1309 * <p>The right level of parallelism for maps seems to be around 10-100 maps
1310 * per-node, although it has been set up to 300 or so for very cpu-light map
1311 * tasks. Task setup takes awhile, so it is best if the maps take at least a
1312 * minute to execute.</p>
1313 *
1314 * <p>The default behavior of file-based {@link InputFormat}s is to split the
1315 * input into <i>logical</i> {@link InputSplit}s based on the total size, in
1316 * bytes, of input files. However, the {@link FileSystem} blocksize of the
1317 * input files is treated as an upper bound for input splits. A lower bound
1318 * on the split size can be set via
1319 * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
1320 * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1321 *
1322 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB,
1323 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is
1324 * used to set it even higher.</p>
1325 *
1326 * @param n the number of map tasks for this job.
1327 * @see InputFormat#getSplits(JobConf, int)
1328 * @see FileInputFormat
1329 * @see FileSystem#getDefaultBlockSize()
1330 * @see FileStatus#getBlockSize()
1331 */
1332 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1333
1334 /**
1335 * Get configured the number of reduce tasks for this job. Defaults to
1336 * <code>1</code>.
1337 *
1338 * @return the number of reduce tasks for this job.
1339 */
1340 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1341
1342 /**
1343 * Set the requisite number of reduce tasks for this job.
1344 *
1345 * <h4 id="NoOfReduces">How many reduces?</h4>
1346 *
1347 * <p>The right number of reduces seems to be <code>0.95</code> or
1348 * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> *
1349 * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
1350 * mapreduce.tasktracker.reduce.tasks.maximum</a>).
1351 * </p>
1352 *
1353 * <p>With <code>0.95</code> all of the reduces can launch immediately and
1354 * start transfering map outputs as the maps finish. With <code>1.75</code>
1355 * the faster nodes will finish their first round of reduces and launch a
1356 * second wave of reduces doing a much better job of load balancing.</p>
1357 *
1358 * <p>Increasing the number of reduces increases the framework overhead, but
1359 * increases load balancing and lowers the cost of failures.</p>
1360 *
1361 * <p>The scaling factors above are slightly less than whole numbers to
1362 * reserve a few reduce slots in the framework for speculative-tasks, failures
1363 * etc.</p>
1364 *
1365 * <h4 id="ReducerNone">Reducer NONE</h4>
1366 *
1367 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1368 *
1369 * <p>In this case the output of the map-tasks directly go to distributed
1370 * file-system, to the path set by
1371 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the
1372 * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1373 *
1374 * @param n the number of reduce tasks for this job.
1375 */
1376 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1377
1378 /**
1379 * Get the configured number of maximum attempts that will be made to run a
1380 * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1381 * property. If this property is not already set, the default is 4 attempts.
1382 *
1383 * @return the max number of attempts per map task.
1384 */
1385 public int getMaxMapAttempts() {
1386 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1387 }
1388
1389 /**
1390 * Expert: Set the number of maximum attempts that will be made to run a
1391 * map task.
1392 *
1393 * @param n the number of attempts per map task.
1394 */
1395 public void setMaxMapAttempts(int n) {
1396 setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1397 }
1398
1399 /**
1400 * Get the configured number of maximum attempts that will be made to run a
1401 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1402 * property. If this property is not already set, the default is 4 attempts.
1403 *
1404 * @return the max number of attempts per reduce task.
1405 */
1406 public int getMaxReduceAttempts() {
1407 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1408 }
1409 /**
1410 * Expert: Set the number of maximum attempts that will be made to run a
1411 * reduce task.
1412 *
1413 * @param n the number of attempts per reduce task.
1414 */
1415 public void setMaxReduceAttempts(int n) {
1416 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1417 }
1418
1419 /**
1420 * Get the user-specified job name. This is only used to identify the
1421 * job to the user.
1422 *
1423 * @return the job's name, defaulting to "".
1424 */
1425 public String getJobName() {
1426 return get(JobContext.JOB_NAME, "");
1427 }
1428
1429 /**
1430 * Set the user-specified job name.
1431 *
1432 * @param name the job's new name.
1433 */
1434 public void setJobName(String name) {
1435 set(JobContext.JOB_NAME, name);
1436 }
1437
1438 /**
1439 * Get the user-specified session identifier. The default is the empty string.
1440 *
1441 * The session identifier is used to tag metric data that is reported to some
1442 * performance metrics system via the org.apache.hadoop.metrics API. The
1443 * session identifier is intended, in particular, for use by Hadoop-On-Demand
1444 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently.
1445 * HOD will set the session identifier by modifying the mapred-site.xml file
1446 * before starting the cluster.
1447 *
1448 * When not running under HOD, this identifer is expected to remain set to
1449 * the empty string.
1450 *
1451 * @return the session identifier, defaulting to "".
1452 */
1453 @Deprecated
1454 public String getSessionId() {
1455 return get("session.id", "");
1456 }
1457
1458 /**
1459 * Set the user-specified session identifier.
1460 *
1461 * @param sessionId the new session id.
1462 */
1463 @Deprecated
1464 public void setSessionId(String sessionId) {
1465 set("session.id", sessionId);
1466 }
1467
1468 /**
1469 * Set the maximum no. of failures of a given job per tasktracker.
1470 * If the no. of task failures exceeds <code>noFailures</code>, the
1471 * tasktracker is <i>blacklisted</i> for this job.
1472 *
1473 * @param noFailures maximum no. of failures of a given job per tasktracker.
1474 */
1475 public void setMaxTaskFailuresPerTracker(int noFailures) {
1476 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1477 }
1478
1479 /**
1480 * Expert: Get the maximum no. of failures of a given job per tasktracker.
1481 * If the no. of task failures exceeds this, the tasktracker is
1482 * <i>blacklisted</i> for this job.
1483 *
1484 * @return the maximum no. of failures of a given job per tasktracker.
1485 */
1486 public int getMaxTaskFailuresPerTracker() {
1487 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
1488 }
1489
1490 /**
1491 * Get the maximum percentage of map tasks that can fail without
1492 * the job being aborted.
1493 *
1494 * Each map task is executed a minimum of {@link #getMaxMapAttempts()}
1495 * attempts before being declared as <i>failed</i>.
1496 *
1497 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1498 * the job being declared as {@link JobStatus#FAILED}.
1499 *
1500 * @return the maximum percentage of map tasks that can fail without
1501 * the job being aborted.
1502 */
1503 public int getMaxMapTaskFailuresPercent() {
1504 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1505 }
1506
1507 /**
1508 * Expert: Set the maximum percentage of map tasks that can fail without the
1509 * job being aborted.
1510 *
1511 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts
1512 * before being declared as <i>failed</i>.
1513 *
1514 * @param percent the maximum percentage of map tasks that can fail without
1515 * the job being aborted.
1516 */
1517 public void setMaxMapTaskFailuresPercent(int percent) {
1518 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1519 }
1520
1521 /**
1522 * Get the maximum percentage of reduce tasks that can fail without
1523 * the job being aborted.
1524 *
1525 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
1526 * attempts before being declared as <i>failed</i>.
1527 *
1528 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results
1529 * in the job being declared as {@link JobStatus#FAILED}.
1530 *
1531 * @return the maximum percentage of reduce tasks that can fail without
1532 * the job being aborted.
1533 */
1534 public int getMaxReduceTaskFailuresPercent() {
1535 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1536 }
1537
1538 /**
1539 * Set the maximum percentage of reduce tasks that can fail without the job
1540 * being aborted.
1541 *
1542 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
1543 * attempts before being declared as <i>failed</i>.
1544 *
1545 * @param percent the maximum percentage of reduce tasks that can fail without
1546 * the job being aborted.
1547 */
1548 public void setMaxReduceTaskFailuresPercent(int percent) {
1549 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1550 }
1551
1552 /**
1553 * Set {@link JobPriority} for this job.
1554 *
1555 * @param prio the {@link JobPriority} for this job.
1556 */
1557 public void setJobPriority(JobPriority prio) {
1558 set(JobContext.PRIORITY, prio.toString());
1559 }
1560
1561 /**
1562 * Get the {@link JobPriority} for this job.
1563 *
1564 * @return the {@link JobPriority} for this job.
1565 */
1566 public JobPriority getJobPriority() {
1567 String prio = get(JobContext.PRIORITY);
1568 if(prio == null) {
1569 return JobPriority.NORMAL;
1570 }
1571
1572 return JobPriority.valueOf(prio);
1573 }
1574
1575 /**
1576 * Set JobSubmitHostName for this job.
1577 *
1578 * @param hostname the JobSubmitHostName for this job.
1579 */
1580 void setJobSubmitHostName(String hostname) {
1581 set(MRJobConfig.JOB_SUBMITHOST, hostname);
1582 }
1583
1584 /**
1585 * Get the JobSubmitHostName for this job.
1586 *
1587 * @return the JobSubmitHostName for this job.
1588 */
1589 String getJobSubmitHostName() {
1590 String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1591
1592 return hostname;
1593 }
1594
1595 /**
1596 * Set JobSubmitHostAddress for this job.
1597 *
1598 * @param hostadd the JobSubmitHostAddress for this job.
1599 */
1600 void setJobSubmitHostAddress(String hostadd) {
1601 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1602 }
1603
1604 /**
1605 * Get JobSubmitHostAddress for this job.
1606 *
1607 * @return JobSubmitHostAddress for this job.
1608 */
1609 String getJobSubmitHostAddress() {
1610 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1611
1612 return hostadd;
1613 }
1614
1615 /**
1616 * Get whether the task profiling is enabled.
1617 * @return true if some tasks will be profiled
1618 */
1619 public boolean getProfileEnabled() {
1620 return getBoolean(JobContext.TASK_PROFILE, false);
1621 }
1622
1623 /**
1624 * Set whether the system should collect profiler information for some of
1625 * the tasks in this job? The information is stored in the user log
1626 * directory.
1627 * @param newValue true means it should be gathered
1628 */
1629 public void setProfileEnabled(boolean newValue) {
1630 setBoolean(JobContext.TASK_PROFILE, newValue);
1631 }
1632
1633 /**
1634 * Get the profiler configuration arguments.
1635 *
1636 * The default value for this property is
1637 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1638 *
1639 * @return the parameters to pass to the task child to configure profiling
1640 */
1641 public String getProfileParams() {
1642 return get(JobContext.TASK_PROFILE_PARAMS,
1643 "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," +
1644 "verbose=n,file=%s");
1645 }
1646
1647 /**
1648 * Set the profiler configuration arguments. If the string contains a '%s' it
1649 * will be replaced with the name of the profiling output file when the task
1650 * runs.
1651 *
1652 * This value is passed to the task child JVM on the command line.
1653 *
1654 * @param value the configuration string
1655 */
1656 public void setProfileParams(String value) {
1657 set(JobContext.TASK_PROFILE_PARAMS, value);
1658 }
1659
1660 /**
1661 * Get the range of maps or reduces to profile.
1662 * @param isMap is the task a map?
1663 * @return the task ranges
1664 */
1665 public IntegerRanges getProfileTaskRange(boolean isMap) {
1666 return getRange((isMap ? JobContext.NUM_MAP_PROFILES :
1667 JobContext.NUM_REDUCE_PROFILES), "0-2");
1668 }
1669
1670 /**
1671 * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
1672 * must also be called.
1673 * @param newValue a set of integer ranges of the map ids
1674 */
1675 public void setProfileTaskRange(boolean isMap, String newValue) {
1676 // parse the value to make sure it is legal
1677 new Configuration.IntegerRanges(newValue);
1678 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES),
1679 newValue);
1680 }
1681
1682 /**
1683 * Set the debug script to run when the map tasks fail.
1684 *
1685 * <p>The debug script can aid debugging of failed map tasks. The script is
1686 * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1687 *
1688 * <p>The debug command, run on the node where the map failed, is:</p>
1689 * <p><pre><blockquote>
1690 * $script $stdout $stderr $syslog $jobconf.
1691 * </blockquote></pre></p>
1692 *
1693 * <p> The script file is distributed through {@link DistributedCache}
1694 * APIs. The script needs to be symlinked. </p>
1695 *
1696 * <p>Here is an example on how to submit a script
1697 * <p><blockquote><pre>
1698 * job.setMapDebugScript("./myscript");
1699 * DistributedCache.createSymlink(job);
1700 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1701 * </pre></blockquote></p>
1702 *
1703 * @param mDbgScript the script name
1704 */
1705 public void setMapDebugScript(String mDbgScript) {
1706 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1707 }
1708
1709 /**
1710 * Get the map task's debug script.
1711 *
1712 * @return the debug Script for the mapred job for failed map tasks.
1713 * @see #setMapDebugScript(String)
1714 */
1715 public String getMapDebugScript() {
1716 return get(JobContext.MAP_DEBUG_SCRIPT);
1717 }
1718
1719 /**
1720 * Set the debug script to run when the reduce tasks fail.
1721 *
1722 * <p>The debug script can aid debugging of failed reduce tasks. The script
1723 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1724 *
1725 * <p>The debug command, run on the node where the map failed, is:</p>
1726 * <p><pre><blockquote>
1727 * $script $stdout $stderr $syslog $jobconf.
1728 * </blockquote></pre></p>
1729 *
1730 * <p> The script file is distributed through {@link DistributedCache}
1731 * APIs. The script file needs to be symlinked </p>
1732 *
1733 * <p>Here is an example on how to submit a script
1734 * <p><blockquote><pre>
1735 * job.setReduceDebugScript("./myscript");
1736 * DistributedCache.createSymlink(job);
1737 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1738 * </pre></blockquote></p>
1739 *
1740 * @param rDbgScript the script name
1741 */
1742 public void setReduceDebugScript(String rDbgScript) {
1743 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1744 }
1745
1746 /**
1747 * Get the reduce task's debug Script
1748 *
1749 * @return the debug script for the mapred job for failed reduce tasks.
1750 * @see #setReduceDebugScript(String)
1751 */
1752 public String getReduceDebugScript() {
1753 return get(JobContext.REDUCE_DEBUG_SCRIPT);
1754 }
1755
1756 /**
1757 * Get the uri to be invoked in-order to send a notification after the job
1758 * has completed (success/failure).
1759 *
1760 * @return the job end notification uri, <code>null</code> if it hasn't
1761 * been set.
1762 * @see #setJobEndNotificationURI(String)
1763 */
1764 public String getJobEndNotificationURI() {
1765 return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1766 }
1767
1768 /**
1769 * Set the uri to be invoked in-order to send a notification after the job
1770 * has completed (success/failure).
1771 *
1772 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and
1773 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's
1774 * identifier and completion-status respectively.</p>
1775 *
1776 * <p>This is typically used by application-writers to implement chaining of
1777 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1778 *
1779 * @param uri the job end notification uri
1780 * @see JobStatus
1781 * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#
1782 * JobCompletionAndChaining">Job Completion and Chaining</a>
1783 */
1784 public void setJobEndNotificationURI(String uri) {
1785 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1786 }
1787
1788 /**
1789 * Get job-specific shared directory for use as scratch space
1790 *
1791 * <p>
1792 * When a job starts, a shared directory is created at location
1793 * <code>
1794 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1795 * This directory is exposed to the users through
1796 * <code>mapreduce.job.local.dir </code>.
1797 * So, the tasks can use this space
1798 * as scratch space and share files among them. </p>
1799 * This value is available as System property also.
1800 *
1801 * @return The localized job specific shared directory
1802 */
1803 public String getJobLocalDir() {
1804 return get(JobContext.JOB_LOCAL_DIR);
1805 }
1806
1807 /**
1808 * Get memory required to run a map task of the job, in MB.
1809 *
1810 * If a value is specified in the configuration, it is returned.
1811 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1812 * <p/>
1813 * For backward compatibility, if the job configuration sets the
1814 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1815 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1816 * after converting it from bytes to MB.
1817 * @return memory required to run a map task of the job, in MB,
1818 * or {@link #DISABLED_MEMORY_LIMIT} if unset.
1819 */
1820 public long getMemoryForMapTask() {
1821 long value = getDeprecatedMemoryValue();
1822 if (value == DISABLED_MEMORY_LIMIT) {
1823 value = normalizeMemoryConfigValue(
1824 getLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY,
1825 DISABLED_MEMORY_LIMIT));
1826 }
1827 // In case that M/R 1.x applications use the old property name
1828 if (value == DISABLED_MEMORY_LIMIT) {
1829 value = normalizeMemoryConfigValue(
1830 getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
1831 DISABLED_MEMORY_LIMIT));
1832 }
1833 return value;
1834 }
1835
1836 public void setMemoryForMapTask(long mem) {
1837 setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1838 // In case that M/R 1.x applications use the old property name
1839 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1840 }
1841
1842 /**
1843 * Get memory required to run a reduce task of the job, in MB.
1844 *
1845 * If a value is specified in the configuration, it is returned.
1846 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1847 * <p/>
1848 * For backward compatibility, if the job configuration sets the
1849 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1850 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1851 * after converting it from bytes to MB.
1852 * @return memory required to run a reduce task of the job, in MB,
1853 * or {@link #DISABLED_MEMORY_LIMIT} if unset.
1854 */
1855 public long getMemoryForReduceTask() {
1856 long value = getDeprecatedMemoryValue();
1857 if (value == DISABLED_MEMORY_LIMIT) {
1858 value = normalizeMemoryConfigValue(
1859 getLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY,
1860 DISABLED_MEMORY_LIMIT));
1861 }
1862 // In case that M/R 1.x applications use the old property name
1863 if (value == DISABLED_MEMORY_LIMIT) {
1864 value = normalizeMemoryConfigValue(
1865 getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
1866 DISABLED_MEMORY_LIMIT));
1867 }
1868 return value;
1869 }
1870
1871 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1872 // converted into MBs.
1873 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1874 // value.
1875 private long getDeprecatedMemoryValue() {
1876 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
1877 DISABLED_MEMORY_LIMIT);
1878 oldValue = normalizeMemoryConfigValue(oldValue);
1879 if (oldValue != DISABLED_MEMORY_LIMIT) {
1880 oldValue /= (1024*1024);
1881 }
1882 return oldValue;
1883 }
1884
1885 public void setMemoryForReduceTask(long mem) {
1886 setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1887 // In case that M/R 1.x applications use the old property name
1888 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1889 }
1890
1891 /**
1892 * Return the name of the queue to which this job is submitted.
1893 * Defaults to 'default'.
1894 *
1895 * @return name of the queue
1896 */
1897 public String getQueueName() {
1898 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1899 }
1900
1901 /**
1902 * Set the name of the queue to which this job should be submitted.
1903 *
1904 * @param queueName Name of the queue
1905 */
1906 public void setQueueName(String queueName) {
1907 set(JobContext.QUEUE_NAME, queueName);
1908 }
1909
1910 /**
1911 * Normalize the negative values in configuration
1912 *
1913 * @param val
1914 * @return normalized value
1915 */
1916 public static long normalizeMemoryConfigValue(long val) {
1917 if (val < 0) {
1918 val = DISABLED_MEMORY_LIMIT;
1919 }
1920 return val;
1921 }
1922
1923 /**
1924 * Compute the number of slots required to run a single map task-attempt
1925 * of this job.
1926 * @param slotSizePerMap cluster-wide value of the amount of memory required
1927 * to run a map-task
1928 * @return the number of slots required to run a single map task-attempt
1929 * 1 if memory parameters are disabled.
1930 */
1931 int computeNumSlotsPerMap(long slotSizePerMap) {
1932 if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) ||
1933 (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) {
1934 return 1;
1935 }
1936 return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap));
1937 }
1938
1939 /**
1940 * Compute the number of slots required to run a single reduce task-attempt
1941 * of this job.
1942 * @param slotSizePerReduce cluster-wide value of the amount of memory
1943 * required to run a reduce-task
1944 * @return the number of slots required to run a single reduce task-attempt
1945 * 1 if memory parameters are disabled
1946 */
1947 int computeNumSlotsPerReduce(long slotSizePerReduce) {
1948 if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) ||
1949 (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) {
1950 return 1;
1951 }
1952 return
1953 (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
1954 }
1955
1956 /**
1957 * Find a jar that contains a class of the same name, if any.
1958 * It will return a jar file, even if that is not the first thing
1959 * on the class path that has a class with the same name.
1960 *
1961 * @param my_class the class to find.
1962 * @return a jar file that contains the class, or null.
1963 * @throws IOException
1964 */
1965 public static String findContainingJar(Class my_class) {
1966 return ClassUtil.findContainingJar(my_class);
1967 }
1968
1969 /**
1970 * Get the memory required to run a task of this job, in bytes. See
1971 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1972 * <p/>
1973 * This method is deprecated. Now, different memory limits can be
1974 * set for map and reduce tasks of a job, in MB.
1975 * <p/>
1976 * For backward compatibility, if the job configuration sets the
1977 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1978 * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned.
1979 * Otherwise, this method will return the larger of the values returned by
1980 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
1981 * after converting them into bytes.
1982 *
1983 * @return Memory required to run a task of this job, in bytes,
1984 * or {@link #DISABLED_MEMORY_LIMIT}, if unset.
1985 * @see #setMaxVirtualMemoryForTask(long)
1986 * @deprecated Use {@link #getMemoryForMapTask()} and
1987 * {@link #getMemoryForReduceTask()}
1988 */
1989 @Deprecated
1990 public long getMaxVirtualMemoryForTask() {
1991 LOG.warn(
1992 "getMaxVirtualMemoryForTask() is deprecated. " +
1993 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
1994
1995 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
1996 value = normalizeMemoryConfigValue(value);
1997 if (value == DISABLED_MEMORY_LIMIT) {
1998 value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
1999 value = normalizeMemoryConfigValue(value);
2000 if (value != DISABLED_MEMORY_LIMIT) {
2001 value *= 1024*1024;
2002 }
2003 }
2004 return value;
2005 }
2006
2007 /**
2008 * Set the maximum amount of memory any task of this job can use. See
2009 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
2010 * <p/>
2011 * mapred.task.maxvmem is split into
2012 * mapreduce.map.memory.mb
2013 * and mapreduce.map.memory.mb,mapred
2014 * each of the new key are set
2015 * as mapred.task.maxvmem / 1024
2016 * as new values are in MB
2017 *
2018 * @param vmem Maximum amount of virtual memory in bytes any task of this job
2019 * can use.
2020 * @see #getMaxVirtualMemoryForTask()
2021 * @deprecated
2022 * Use {@link #setMemoryForMapTask(long mem)} and
2023 * Use {@link #setMemoryForReduceTask(long mem)}
2024 */
2025 @Deprecated
2026 public void setMaxVirtualMemoryForTask(long vmem) {
2027 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
2028 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
2029 if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) {
2030 setMemoryForMapTask(DISABLED_MEMORY_LIMIT);
2031 setMemoryForReduceTask(DISABLED_MEMORY_LIMIT);
2032 }
2033
2034 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
2035 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
2036 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
2037 }else{
2038 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
2039 }
2040 }
2041
2042 /**
2043 * @deprecated this variable is deprecated and nolonger in use.
2044 */
2045 @Deprecated
2046 public long getMaxPhysicalMemoryForTask() {
2047 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
2048 + " Refer to the APIs getMemoryForMapTask() and"
2049 + " getMemoryForReduceTask() for details.");
2050 return -1;
2051 }
2052
2053 /*
2054 * @deprecated this
2055 */
2056 @Deprecated
2057 public void setMaxPhysicalMemoryForTask(long mem) {
2058 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
2059 + " The value set is ignored. Refer to "
2060 + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
2061 }
2062
2063 static String deprecatedString(String key) {
2064 return "The variable " + key + " is no longer used.";
2065 }
2066
2067 private void checkAndWarnDeprecation() {
2068 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
2069 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
2070 + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY
2071 + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY);
2072 }
2073 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
2074 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
2075 }
2076 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
2077 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
2078 }
2079 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
2080 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
2081 }
2082 }
2083
2084
2085 }
2086