001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021
022 import java.io.IOException;
023 import java.util.regex.Pattern;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceAudience.Private;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.fs.FileStatus;
032 import org.apache.hadoop.fs.FileSystem;
033 import org.apache.hadoop.fs.Path;
034 import org.apache.hadoop.io.LongWritable;
035 import org.apache.hadoop.io.RawComparator;
036 import org.apache.hadoop.io.Text;
037 import org.apache.hadoop.io.WritableComparable;
038 import org.apache.hadoop.io.WritableComparator;
039 import org.apache.hadoop.io.compress.CompressionCodec;
040 import org.apache.hadoop.mapred.lib.HashPartitioner;
041 import org.apache.hadoop.mapred.lib.IdentityMapper;
042 import org.apache.hadoop.mapred.lib.IdentityReducer;
043 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
044 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
045 import org.apache.hadoop.mapreduce.MRConfig;
046 import org.apache.hadoop.mapreduce.MRJobConfig;
047 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
048 import org.apache.hadoop.mapreduce.util.ConfigUtil;
049 import org.apache.hadoop.security.Credentials;
050 import org.apache.hadoop.util.ClassUtil;
051 import org.apache.hadoop.util.ReflectionUtils;
052 import org.apache.hadoop.util.Tool;
053 import org.apache.log4j.Level;
054
055 /**
056 * A map/reduce job configuration.
057 *
058 * <p><code>JobConf</code> is the primary interface for a user to describe a
059 * map-reduce job to the Hadoop framework for execution. The framework tries to
060 * faithfully execute the job as-is described by <code>JobConf</code>, however:
061 * <ol>
062 * <li>
063 * Some configuration parameters might have been marked as
064 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
065 * final</a> by administrators and hence cannot be altered.
066 * </li>
067 * <li>
068 * While some job parameters are straight-forward to set
069 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly
070 * with the rest of the framework and/or job-configuration and is relatively
071 * more complex for the user to control finely
072 * (e.g. {@link #setNumMapTasks(int)}).
073 * </li>
074 * </ol></p>
075 *
076 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner
077 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and
078 * {@link OutputFormat} implementations to be used etc.
079 *
080 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets
081 * of the job such as <code>Comparator</code>s to be used, files to be put in
082 * the {@link DistributedCache}, whether or not intermediate and/or job outputs
083 * are to be compressed (and how), debugability via user-provided scripts
084 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
085 * for doing post-processing on task logs, task's stdout, stderr, syslog.
086 * and etc.</p>
087 *
088 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
089 * <p><blockquote><pre>
090 * // Create a new JobConf
091 * JobConf job = new JobConf(new Configuration(), MyJob.class);
092 *
093 * // Specify various job-specific parameters
094 * job.setJobName("myjob");
095 *
096 * FileInputFormat.setInputPaths(job, new Path("in"));
097 * FileOutputFormat.setOutputPath(job, new Path("out"));
098 *
099 * job.setMapperClass(MyJob.MyMapper.class);
100 * job.setCombinerClass(MyJob.MyReducer.class);
101 * job.setReducerClass(MyJob.MyReducer.class);
102 *
103 * job.setInputFormat(SequenceFileInputFormat.class);
104 * job.setOutputFormat(SequenceFileOutputFormat.class);
105 * </pre></blockquote></p>
106 *
107 * @see JobClient
108 * @see ClusterStatus
109 * @see Tool
110 * @see DistributedCache
111 */
112 @InterfaceAudience.Public
113 @InterfaceStability.Stable
114 public class JobConf extends Configuration {
115
116 private static final Log LOG = LogFactory.getLog(JobConf.class);
117
118 static{
119 ConfigUtil.loadResources();
120 }
121
122 /**
123 * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and
124 * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
125 */
126 @Deprecated
127 public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
128 "mapred.task.maxvmem";
129
130 /**
131 * @deprecated
132 */
133 @Deprecated
134 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
135 "mapred.task.limit.maxvmem";
136
137 /**
138 * @deprecated
139 */
140 @Deprecated
141 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
142 "mapred.task.default.maxvmem";
143
144 /**
145 * @deprecated
146 */
147 @Deprecated
148 public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
149 "mapred.task.maxpmem";
150
151 /**
152 * A value which if set for memory related configuration options,
153 * indicates that the options are turned off.
154 */
155 public static final long DISABLED_MEMORY_LIMIT = -1L;
156
157 /**
158 * Property name for the configuration property mapreduce.cluster.local.dir
159 */
160 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
161
162 /**
163 * Name of the queue to which jobs will be submitted, if no queue
164 * name is mentioned.
165 */
166 public static final String DEFAULT_QUEUE_NAME = "default";
167
168 static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY =
169 JobContext.MAP_MEMORY_MB;
170
171 static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY =
172 JobContext.REDUCE_MEMORY_MB;
173
174 /**
175 * The variable is kept for M/R 1.x applications, while M/R 2.x applications
176 * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY}
177 */
178 @Deprecated
179 public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
180 "mapred.job.map.memory.mb";
181
182 /**
183 * The variable is kept for M/R 1.x applications, while M/R 2.x applications
184 * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
185 */
186 @Deprecated
187 public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
188 "mapred.job.reduce.memory.mb";
189
190 /** Pattern for the default unpacking behavior for job jars */
191 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
192 Pattern.compile("(?:classes/|lib/).*");
193
194 /**
195 * Configuration key to set the java command line options for the child
196 * map and reduce tasks.
197 *
198 * Java opts for the task tracker child processes.
199 * The following symbol, if present, will be interpolated: @taskid@.
200 * It is replaced by current TaskID. Any other occurrences of '@' will go
201 * unchanged.
202 * For example, to enable verbose gc logging to a file named for the taskid in
203 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
204 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
205 *
206 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass
207 * other environment variables to the child processes.
208 *
209 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or
210 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
211 */
212 @Deprecated
213 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
214
215 /**
216 * Configuration key to set the java command line options for the map tasks.
217 *
218 * Java opts for the task tracker child map processes.
219 * The following symbol, if present, will be interpolated: @taskid@.
220 * It is replaced by current TaskID. Any other occurrences of '@' will go
221 * unchanged.
222 * For example, to enable verbose gc logging to a file named for the taskid in
223 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
224 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
225 *
226 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass
227 * other environment variables to the map processes.
228 */
229 public static final String MAPRED_MAP_TASK_JAVA_OPTS =
230 JobContext.MAP_JAVA_OPTS;
231
232 /**
233 * Configuration key to set the java command line options for the reduce tasks.
234 *
235 * Java opts for the task tracker child reduce processes.
236 * The following symbol, if present, will be interpolated: @taskid@.
237 * It is replaced by current TaskID. Any other occurrences of '@' will go
238 * unchanged.
239 * For example, to enable verbose gc logging to a file named for the taskid in
240 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
241 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
242 *
243 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to
244 * pass process environment variables to the reduce processes.
245 */
246 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS =
247 JobContext.REDUCE_JAVA_OPTS;
248
249 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
250
251 /**
252 * @deprecated
253 * Configuration key to set the maximum virtual memory available to the child
254 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
255 * longer have any effect.
256 */
257 @Deprecated
258 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
259
260 /**
261 * @deprecated
262 * Configuration key to set the maximum virtual memory available to the
263 * map tasks (in kilo-bytes). This has been deprecated and will no
264 * longer have any effect.
265 */
266 @Deprecated
267 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
268
269 /**
270 * @deprecated
271 * Configuration key to set the maximum virtual memory available to the
272 * reduce tasks (in kilo-bytes). This has been deprecated and will no
273 * longer have any effect.
274 */
275 @Deprecated
276 public static final String MAPRED_REDUCE_TASK_ULIMIT =
277 "mapreduce.reduce.ulimit";
278
279
280 /**
281 * Configuration key to set the environment of the child map/reduce tasks.
282 *
283 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
284 * reference existing environment variables via <code>$key</code> on
285 * Linux or <code>%key%</code> on Windows.
286 *
287 * Example:
288 * <ul>
289 * <li> A=foo - This will set the env variable A to foo. </li>
290 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
291 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
292 * </ul>
293 *
294 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or
295 * {@link #MAPRED_REDUCE_TASK_ENV}
296 */
297 @Deprecated
298 public static final String MAPRED_TASK_ENV = "mapred.child.env";
299
300 /**
301 * Configuration key to set the environment of the child map tasks.
302 *
303 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
304 * reference existing environment variables via <code>$key</code> on
305 * Linux or <code>%key%</code> on Windows.
306 *
307 * Example:
308 * <ul>
309 * <li> A=foo - This will set the env variable A to foo. </li>
310 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
311 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
312 * </ul>
313 */
314 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
315
316 /**
317 * Configuration key to set the environment of the child reduce tasks.
318 *
319 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
320 * reference existing environment variables via <code>$key</code> on
321 * Linux or <code>%key%</code> on Windows.
322 *
323 * Example:
324 * <ul>
325 * <li> A=foo - This will set the env variable A to foo. </li>
326 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
327 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
328 * </ul>
329 */
330 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
331
332 private Credentials credentials = new Credentials();
333
334 /**
335 * Configuration key to set the logging {@link Level} for the map task.
336 *
337 * The allowed logging levels are:
338 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
339 */
340 public static final String MAPRED_MAP_TASK_LOG_LEVEL =
341 JobContext.MAP_LOG_LEVEL;
342
343 /**
344 * Configuration key to set the logging {@link Level} for the reduce task.
345 *
346 * The allowed logging levels are:
347 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
348 */
349 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL =
350 JobContext.REDUCE_LOG_LEVEL;
351
352 /**
353 * Default logging level for map/reduce tasks.
354 */
355 public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
356
357 /**
358 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
359 * use {@link MRJobConfig#WORKFLOW_ID} instead
360 */
361 @Deprecated
362 public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID;
363
364 /**
365 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
366 * use {@link MRJobConfig#WORKFLOW_NAME} instead
367 */
368 @Deprecated
369 public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME;
370
371 /**
372 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
373 * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead
374 */
375 @Deprecated
376 public static final String WORKFLOW_NODE_NAME =
377 MRJobConfig.WORKFLOW_NODE_NAME;
378
379 /**
380 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
381 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead
382 */
383 @Deprecated
384 public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
385 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING;
386
387 /**
388 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
389 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead
390 */
391 @Deprecated
392 public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
393 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN;
394
395 /**
396 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
397 * use {@link MRJobConfig#WORKFLOW_TAGS} instead
398 */
399 @Deprecated
400 public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS;
401
402 /**
403 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
404 * not use it
405 */
406 @Deprecated
407 public static final String MAPREDUCE_RECOVER_JOB =
408 "mapreduce.job.restart.recover";
409
410 /**
411 * The variable is kept for M/R 1.x applications, M/R 2.x applications should
412 * not use it
413 */
414 @Deprecated
415 public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true;
416
417 /**
418 * Construct a map/reduce job configuration.
419 */
420 public JobConf() {
421 checkAndWarnDeprecation();
422 }
423
424 /**
425 * Construct a map/reduce job configuration.
426 *
427 * @param exampleClass a class whose containing jar is used as the job's jar.
428 */
429 public JobConf(Class exampleClass) {
430 setJarByClass(exampleClass);
431 checkAndWarnDeprecation();
432 }
433
434 /**
435 * Construct a map/reduce job configuration.
436 *
437 * @param conf a Configuration whose settings will be inherited.
438 */
439 public JobConf(Configuration conf) {
440 super(conf);
441
442 if (conf instanceof JobConf) {
443 JobConf that = (JobConf)conf;
444 credentials = that.credentials;
445 }
446
447 checkAndWarnDeprecation();
448 }
449
450
451 /** Construct a map/reduce job configuration.
452 *
453 * @param conf a Configuration whose settings will be inherited.
454 * @param exampleClass a class whose containing jar is used as the job's jar.
455 */
456 public JobConf(Configuration conf, Class exampleClass) {
457 this(conf);
458 setJarByClass(exampleClass);
459 }
460
461
462 /** Construct a map/reduce configuration.
463 *
464 * @param config a Configuration-format XML job description file.
465 */
466 public JobConf(String config) {
467 this(new Path(config));
468 }
469
470 /** Construct a map/reduce configuration.
471 *
472 * @param config a Configuration-format XML job description file.
473 */
474 public JobConf(Path config) {
475 super();
476 addResource(config);
477 checkAndWarnDeprecation();
478 }
479
480 /** A new map/reduce configuration where the behavior of reading from the
481 * default resources can be turned off.
482 * <p/>
483 * If the parameter {@code loadDefaults} is false, the new instance
484 * will not load resources from the default files.
485 *
486 * @param loadDefaults specifies whether to load from the default files
487 */
488 public JobConf(boolean loadDefaults) {
489 super(loadDefaults);
490 checkAndWarnDeprecation();
491 }
492
493 /**
494 * Get credentials for the job.
495 * @return credentials for the job
496 */
497 public Credentials getCredentials() {
498 return credentials;
499 }
500
501 @Private
502 public void setCredentials(Credentials credentials) {
503 this.credentials = credentials;
504 }
505
506 /**
507 * Get the user jar for the map-reduce job.
508 *
509 * @return the user jar for the map-reduce job.
510 */
511 public String getJar() { return get(JobContext.JAR); }
512
513 /**
514 * Set the user jar for the map-reduce job.
515 *
516 * @param jar the user jar for the map-reduce job.
517 */
518 public void setJar(String jar) { set(JobContext.JAR, jar); }
519
520 /**
521 * Get the pattern for jar contents to unpack on the tasktracker
522 */
523 public Pattern getJarUnpackPattern() {
524 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
525 }
526
527
528 /**
529 * Set the job's jar file by finding an example class location.
530 *
531 * @param cls the example class.
532 */
533 public void setJarByClass(Class cls) {
534 String jar = ClassUtil.findContainingJar(cls);
535 if (jar != null) {
536 setJar(jar);
537 }
538 }
539
540 public String[] getLocalDirs() throws IOException {
541 return getTrimmedStrings(MRConfig.LOCAL_DIR);
542 }
543
544 /**
545 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
546 */
547 @Deprecated
548 public void deleteLocalFiles() throws IOException {
549 String[] localDirs = getLocalDirs();
550 for (int i = 0; i < localDirs.length; i++) {
551 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
552 }
553 }
554
555 public void deleteLocalFiles(String subdir) throws IOException {
556 String[] localDirs = getLocalDirs();
557 for (int i = 0; i < localDirs.length; i++) {
558 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
559 }
560 }
561
562 /**
563 * Constructs a local file name. Files are distributed among configured
564 * local directories.
565 */
566 public Path getLocalPath(String pathString) throws IOException {
567 return getLocalPath(MRConfig.LOCAL_DIR, pathString);
568 }
569
570 /**
571 * Get the reported username for this job.
572 *
573 * @return the username
574 */
575 public String getUser() {
576 return get(JobContext.USER_NAME);
577 }
578
579 /**
580 * Set the reported username for this job.
581 *
582 * @param user the username for this job.
583 */
584 public void setUser(String user) {
585 set(JobContext.USER_NAME, user);
586 }
587
588
589
590 /**
591 * Set whether the framework should keep the intermediate files for
592 * failed tasks.
593 *
594 * @param keep <code>true</code> if framework should keep the intermediate files
595 * for failed tasks, <code>false</code> otherwise.
596 *
597 */
598 public void setKeepFailedTaskFiles(boolean keep) {
599 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
600 }
601
602 /**
603 * Should the temporary files for failed tasks be kept?
604 *
605 * @return should the files be kept?
606 */
607 public boolean getKeepFailedTaskFiles() {
608 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
609 }
610
611 /**
612 * Set a regular expression for task names that should be kept.
613 * The regular expression ".*_m_000123_0" would keep the files
614 * for the first instance of map 123 that ran.
615 *
616 * @param pattern the java.util.regex.Pattern to match against the
617 * task names.
618 */
619 public void setKeepTaskFilesPattern(String pattern) {
620 set(JobContext.PRESERVE_FILES_PATTERN, pattern);
621 }
622
623 /**
624 * Get the regular expression that is matched against the task names
625 * to see if we need to keep the files.
626 *
627 * @return the pattern as a string, if it was set, othewise null.
628 */
629 public String getKeepTaskFilesPattern() {
630 return get(JobContext.PRESERVE_FILES_PATTERN);
631 }
632
633 /**
634 * Set the current working directory for the default file system.
635 *
636 * @param dir the new current working directory.
637 */
638 public void setWorkingDirectory(Path dir) {
639 dir = new Path(getWorkingDirectory(), dir);
640 set(JobContext.WORKING_DIR, dir.toString());
641 }
642
643 /**
644 * Get the current working directory for the default file system.
645 *
646 * @return the directory name.
647 */
648 public Path getWorkingDirectory() {
649 String name = get(JobContext.WORKING_DIR);
650 if (name != null) {
651 return new Path(name);
652 } else {
653 try {
654 Path dir = FileSystem.get(this).getWorkingDirectory();
655 set(JobContext.WORKING_DIR, dir.toString());
656 return dir;
657 } catch (IOException e) {
658 throw new RuntimeException(e);
659 }
660 }
661 }
662
663 /**
664 * Sets the number of tasks that a spawned task JVM should run
665 * before it exits
666 * @param numTasks the number of tasks to execute; defaults to 1;
667 * -1 signifies no limit
668 */
669 public void setNumTasksToExecutePerJvm(int numTasks) {
670 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
671 }
672
673 /**
674 * Get the number of tasks that a spawned JVM should execute
675 */
676 public int getNumTasksToExecutePerJvm() {
677 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
678 }
679
680 /**
681 * Get the {@link InputFormat} implementation for the map-reduce job,
682 * defaults to {@link TextInputFormat} if not specified explicity.
683 *
684 * @return the {@link InputFormat} implementation for the map-reduce job.
685 */
686 public InputFormat getInputFormat() {
687 return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
688 TextInputFormat.class,
689 InputFormat.class),
690 this);
691 }
692
693 /**
694 * Set the {@link InputFormat} implementation for the map-reduce job.
695 *
696 * @param theClass the {@link InputFormat} implementation for the map-reduce
697 * job.
698 */
699 public void setInputFormat(Class<? extends InputFormat> theClass) {
700 setClass("mapred.input.format.class", theClass, InputFormat.class);
701 }
702
703 /**
704 * Get the {@link OutputFormat} implementation for the map-reduce job,
705 * defaults to {@link TextOutputFormat} if not specified explicity.
706 *
707 * @return the {@link OutputFormat} implementation for the map-reduce job.
708 */
709 public OutputFormat getOutputFormat() {
710 return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
711 TextOutputFormat.class,
712 OutputFormat.class),
713 this);
714 }
715
716 /**
717 * Get the {@link OutputCommitter} implementation for the map-reduce job,
718 * defaults to {@link FileOutputCommitter} if not specified explicitly.
719 *
720 * @return the {@link OutputCommitter} implementation for the map-reduce job.
721 */
722 public OutputCommitter getOutputCommitter() {
723 return (OutputCommitter)ReflectionUtils.newInstance(
724 getClass("mapred.output.committer.class", FileOutputCommitter.class,
725 OutputCommitter.class), this);
726 }
727
728 /**
729 * Set the {@link OutputCommitter} implementation for the map-reduce job.
730 *
731 * @param theClass the {@link OutputCommitter} implementation for the map-reduce
732 * job.
733 */
734 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
735 setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
736 }
737
738 /**
739 * Set the {@link OutputFormat} implementation for the map-reduce job.
740 *
741 * @param theClass the {@link OutputFormat} implementation for the map-reduce
742 * job.
743 */
744 public void setOutputFormat(Class<? extends OutputFormat> theClass) {
745 setClass("mapred.output.format.class", theClass, OutputFormat.class);
746 }
747
748 /**
749 * Should the map outputs be compressed before transfer?
750 *
751 * @param compress should the map outputs be compressed?
752 */
753 public void setCompressMapOutput(boolean compress) {
754 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
755 }
756
757 /**
758 * Are the outputs of the maps be compressed?
759 *
760 * @return <code>true</code> if the outputs of the maps are to be compressed,
761 * <code>false</code> otherwise.
762 */
763 public boolean getCompressMapOutput() {
764 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
765 }
766
767 /**
768 * Set the given class as the {@link CompressionCodec} for the map outputs.
769 *
770 * @param codecClass the {@link CompressionCodec} class that will compress
771 * the map outputs.
772 */
773 public void
774 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
775 setCompressMapOutput(true);
776 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass,
777 CompressionCodec.class);
778 }
779
780 /**
781 * Get the {@link CompressionCodec} for compressing the map outputs.
782 *
783 * @param defaultValue the {@link CompressionCodec} to return if not set
784 * @return the {@link CompressionCodec} class that should be used to compress the
785 * map outputs.
786 * @throws IllegalArgumentException if the class was specified, but not found
787 */
788 public Class<? extends CompressionCodec>
789 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
790 Class<? extends CompressionCodec> codecClass = defaultValue;
791 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
792 if (name != null) {
793 try {
794 codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
795 } catch (ClassNotFoundException e) {
796 throw new IllegalArgumentException("Compression codec " + name +
797 " was not found.", e);
798 }
799 }
800 return codecClass;
801 }
802
803 /**
804 * Get the key class for the map output data. If it is not set, use the
805 * (final) output key class. This allows the map output key class to be
806 * different than the final output key class.
807 *
808 * @return the map output key class.
809 */
810 public Class<?> getMapOutputKeyClass() {
811 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
812 if (retv == null) {
813 retv = getOutputKeyClass();
814 }
815 return retv;
816 }
817
818 /**
819 * Set the key class for the map output data. This allows the user to
820 * specify the map output key class to be different than the final output
821 * value class.
822 *
823 * @param theClass the map output key class.
824 */
825 public void setMapOutputKeyClass(Class<?> theClass) {
826 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
827 }
828
829 /**
830 * Get the value class for the map output data. If it is not set, use the
831 * (final) output value class This allows the map output value class to be
832 * different than the final output value class.
833 *
834 * @return the map output value class.
835 */
836 public Class<?> getMapOutputValueClass() {
837 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
838 Object.class);
839 if (retv == null) {
840 retv = getOutputValueClass();
841 }
842 return retv;
843 }
844
845 /**
846 * Set the value class for the map output data. This allows the user to
847 * specify the map output value class to be different than the final output
848 * value class.
849 *
850 * @param theClass the map output value class.
851 */
852 public void setMapOutputValueClass(Class<?> theClass) {
853 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
854 }
855
856 /**
857 * Get the key class for the job output data.
858 *
859 * @return the key class for the job output data.
860 */
861 public Class<?> getOutputKeyClass() {
862 return getClass(JobContext.OUTPUT_KEY_CLASS,
863 LongWritable.class, Object.class);
864 }
865
866 /**
867 * Set the key class for the job output data.
868 *
869 * @param theClass the key class for the job output data.
870 */
871 public void setOutputKeyClass(Class<?> theClass) {
872 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
873 }
874
875 /**
876 * Get the {@link RawComparator} comparator used to compare keys.
877 *
878 * @return the {@link RawComparator} comparator used to compare keys.
879 */
880 public RawComparator getOutputKeyComparator() {
881 Class<? extends RawComparator> theClass = getClass(
882 JobContext.KEY_COMPARATOR, null, RawComparator.class);
883 if (theClass != null)
884 return ReflectionUtils.newInstance(theClass, this);
885 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
886 }
887
888 /**
889 * Set the {@link RawComparator} comparator used to compare keys.
890 *
891 * @param theClass the {@link RawComparator} comparator used to
892 * compare keys.
893 * @see #setOutputValueGroupingComparator(Class)
894 */
895 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
896 setClass(JobContext.KEY_COMPARATOR,
897 theClass, RawComparator.class);
898 }
899
900 /**
901 * Set the {@link KeyFieldBasedComparator} options used to compare keys.
902 *
903 * @param keySpec the key specification of the form -k pos1[,pos2], where,
904 * pos is of the form f[.c][opts], where f is the number
905 * of the key field to use, and c is the number of the first character from
906 * the beginning of the field. Fields and character posns are numbered
907 * starting with 1; a character position of zero in pos2 indicates the
908 * field's last character. If '.c' is omitted from pos1, it defaults to 1
909 * (the beginning of the field); if omitted from pos2, it defaults to 0
910 * (the end of the field). opts are ordering options. The supported options
911 * are:
912 * -n, (Sort numerically)
913 * -r, (Reverse the result of comparison)
914 */
915 public void setKeyFieldComparatorOptions(String keySpec) {
916 setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
917 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
918 }
919
920 /**
921 * Get the {@link KeyFieldBasedComparator} options
922 */
923 public String getKeyFieldComparatorOption() {
924 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
925 }
926
927 /**
928 * Set the {@link KeyFieldBasedPartitioner} options used for
929 * {@link Partitioner}
930 *
931 * @param keySpec the key specification of the form -k pos1[,pos2], where,
932 * pos is of the form f[.c][opts], where f is the number
933 * of the key field to use, and c is the number of the first character from
934 * the beginning of the field. Fields and character posns are numbered
935 * starting with 1; a character position of zero in pos2 indicates the
936 * field's last character. If '.c' is omitted from pos1, it defaults to 1
937 * (the beginning of the field); if omitted from pos2, it defaults to 0
938 * (the end of the field).
939 */
940 public void setKeyFieldPartitionerOptions(String keySpec) {
941 setPartitionerClass(KeyFieldBasedPartitioner.class);
942 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
943 }
944
945 /**
946 * Get the {@link KeyFieldBasedPartitioner} options
947 */
948 public String getKeyFieldPartitionerOption() {
949 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
950 }
951
952 /**
953 * Get the user defined {@link WritableComparable} comparator for
954 * grouping keys of inputs to the combiner.
955 *
956 * @return comparator set by the user for grouping values.
957 * @see #setCombinerKeyGroupingComparator(Class) for details.
958 */
959 public RawComparator getCombinerKeyGroupingComparator() {
960 Class<? extends RawComparator> theClass = getClass(
961 JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
962 if (theClass == null) {
963 return getOutputKeyComparator();
964 }
965
966 return ReflectionUtils.newInstance(theClass, this);
967 }
968
969 /**
970 * Get the user defined {@link WritableComparable} comparator for
971 * grouping keys of inputs to the reduce.
972 *
973 * @return comparator set by the user for grouping values.
974 * @see #setOutputValueGroupingComparator(Class) for details.
975 */
976 public RawComparator getOutputValueGroupingComparator() {
977 Class<? extends RawComparator> theClass = getClass(
978 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
979 if (theClass == null) {
980 return getOutputKeyComparator();
981 }
982
983 return ReflectionUtils.newInstance(theClass, this);
984 }
985
986 /**
987 * Set the user defined {@link RawComparator} comparator for
988 * grouping keys in the input to the combiner.
989 * <p/>
990 * <p>This comparator should be provided if the equivalence rules for keys
991 * for sorting the intermediates are different from those for grouping keys
992 * before each call to
993 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
994 * <p/>
995 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
996 * in a single call to the reduce function if K1 and K2 compare as equal.</p>
997 * <p/>
998 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
999 * how keys are sorted, this can be used in conjunction to simulate
1000 * <i>secondary sort on values</i>.</p>
1001 * <p/>
1002 * <p><i>Note</i>: This is not a guarantee of the combiner sort being
1003 * <i>stable</i> in any sense. (In any case, with the order of available
1004 * map-outputs to the combiner being non-deterministic, it wouldn't make
1005 * that much sense.)</p>
1006 *
1007 * @param theClass the comparator class to be used for grouping keys for the
1008 * combiner. It should implement <code>RawComparator</code>.
1009 * @see #setOutputKeyComparatorClass(Class)
1010 */
1011 public void setCombinerKeyGroupingComparator(
1012 Class<? extends RawComparator> theClass) {
1013 setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
1014 theClass, RawComparator.class);
1015 }
1016
1017 /**
1018 * Set the user defined {@link RawComparator} comparator for
1019 * grouping keys in the input to the reduce.
1020 *
1021 * <p>This comparator should be provided if the equivalence rules for keys
1022 * for sorting the intermediates are different from those for grouping keys
1023 * before each call to
1024 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
1025 *
1026 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
1027 * in a single call to the reduce function if K1 and K2 compare as equal.</p>
1028 *
1029 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
1030 * how keys are sorted, this can be used in conjunction to simulate
1031 * <i>secondary sort on values</i>.</p>
1032 *
1033 * <p><i>Note</i>: This is not a guarantee of the reduce sort being
1034 * <i>stable</i> in any sense. (In any case, with the order of available
1035 * map-outputs to the reduce being non-deterministic, it wouldn't make
1036 * that much sense.)</p>
1037 *
1038 * @param theClass the comparator class to be used for grouping keys.
1039 * It should implement <code>RawComparator</code>.
1040 * @see #setOutputKeyComparatorClass(Class)
1041 * @see #setCombinerKeyGroupingComparator(Class)
1042 */
1043 public void setOutputValueGroupingComparator(
1044 Class<? extends RawComparator> theClass) {
1045 setClass(JobContext.GROUP_COMPARATOR_CLASS,
1046 theClass, RawComparator.class);
1047 }
1048
1049 /**
1050 * Should the framework use the new context-object code for running
1051 * the mapper?
1052 * @return true, if the new api should be used
1053 */
1054 public boolean getUseNewMapper() {
1055 return getBoolean("mapred.mapper.new-api", false);
1056 }
1057 /**
1058 * Set whether the framework should use the new api for the mapper.
1059 * This is the default for jobs submitted with the new Job api.
1060 * @param flag true, if the new api should be used
1061 */
1062 public void setUseNewMapper(boolean flag) {
1063 setBoolean("mapred.mapper.new-api", flag);
1064 }
1065
1066 /**
1067 * Should the framework use the new context-object code for running
1068 * the reducer?
1069 * @return true, if the new api should be used
1070 */
1071 public boolean getUseNewReducer() {
1072 return getBoolean("mapred.reducer.new-api", false);
1073 }
1074 /**
1075 * Set whether the framework should use the new api for the reducer.
1076 * This is the default for jobs submitted with the new Job api.
1077 * @param flag true, if the new api should be used
1078 */
1079 public void setUseNewReducer(boolean flag) {
1080 setBoolean("mapred.reducer.new-api", flag);
1081 }
1082
1083 /**
1084 * Get the value class for job outputs.
1085 *
1086 * @return the value class for job outputs.
1087 */
1088 public Class<?> getOutputValueClass() {
1089 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
1090 }
1091
1092 /**
1093 * Set the value class for job outputs.
1094 *
1095 * @param theClass the value class for job outputs.
1096 */
1097 public void setOutputValueClass(Class<?> theClass) {
1098 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
1099 }
1100
1101 /**
1102 * Get the {@link Mapper} class for the job.
1103 *
1104 * @return the {@link Mapper} class for the job.
1105 */
1106 public Class<? extends Mapper> getMapperClass() {
1107 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
1108 }
1109
1110 /**
1111 * Set the {@link Mapper} class for the job.
1112 *
1113 * @param theClass the {@link Mapper} class for the job.
1114 */
1115 public void setMapperClass(Class<? extends Mapper> theClass) {
1116 setClass("mapred.mapper.class", theClass, Mapper.class);
1117 }
1118
1119 /**
1120 * Get the {@link MapRunnable} class for the job.
1121 *
1122 * @return the {@link MapRunnable} class for the job.
1123 */
1124 public Class<? extends MapRunnable> getMapRunnerClass() {
1125 return getClass("mapred.map.runner.class",
1126 MapRunner.class, MapRunnable.class);
1127 }
1128
1129 /**
1130 * Expert: Set the {@link MapRunnable} class for the job.
1131 *
1132 * Typically used to exert greater control on {@link Mapper}s.
1133 *
1134 * @param theClass the {@link MapRunnable} class for the job.
1135 */
1136 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1137 setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1138 }
1139
1140 /**
1141 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs
1142 * to be sent to the {@link Reducer}s.
1143 *
1144 * @return the {@link Partitioner} used to partition map-outputs.
1145 */
1146 public Class<? extends Partitioner> getPartitionerClass() {
1147 return getClass("mapred.partitioner.class",
1148 HashPartitioner.class, Partitioner.class);
1149 }
1150
1151 /**
1152 * Set the {@link Partitioner} class used to partition
1153 * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1154 *
1155 * @param theClass the {@link Partitioner} used to partition map-outputs.
1156 */
1157 public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1158 setClass("mapred.partitioner.class", theClass, Partitioner.class);
1159 }
1160
1161 /**
1162 * Get the {@link Reducer} class for the job.
1163 *
1164 * @return the {@link Reducer} class for the job.
1165 */
1166 public Class<? extends Reducer> getReducerClass() {
1167 return getClass("mapred.reducer.class",
1168 IdentityReducer.class, Reducer.class);
1169 }
1170
1171 /**
1172 * Set the {@link Reducer} class for the job.
1173 *
1174 * @param theClass the {@link Reducer} class for the job.
1175 */
1176 public void setReducerClass(Class<? extends Reducer> theClass) {
1177 setClass("mapred.reducer.class", theClass, Reducer.class);
1178 }
1179
1180 /**
1181 * Get the user-defined <i>combiner</i> class used to combine map-outputs
1182 * before being sent to the reducers. Typically the combiner is same as the
1183 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1184 *
1185 * @return the user-defined combiner class used to combine map-outputs.
1186 */
1187 public Class<? extends Reducer> getCombinerClass() {
1188 return getClass("mapred.combiner.class", null, Reducer.class);
1189 }
1190
1191 /**
1192 * Set the user-defined <i>combiner</i> class used to combine map-outputs
1193 * before being sent to the reducers.
1194 *
1195 * <p>The combiner is an application-specified aggregation operation, which
1196 * can help cut down the amount of data transferred between the
1197 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1198 *
1199 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1200 * the mapper and reducer tasks. In general, the combiner is called as the
1201 * sort/merge result is written to disk. The combiner must:
1202 * <ul>
1203 * <li> be side-effect free</li>
1204 * <li> have the same input and output key types and the same input and
1205 * output value types</li>
1206 * </ul></p>
1207 *
1208 * <p>Typically the combiner is same as the <code>Reducer</code> for the
1209 * job i.e. {@link #setReducerClass(Class)}.</p>
1210 *
1211 * @param theClass the user-defined combiner class used to combine
1212 * map-outputs.
1213 */
1214 public void setCombinerClass(Class<? extends Reducer> theClass) {
1215 setClass("mapred.combiner.class", theClass, Reducer.class);
1216 }
1217
1218 /**
1219 * Should speculative execution be used for this job?
1220 * Defaults to <code>true</code>.
1221 *
1222 * @return <code>true</code> if speculative execution be used for this job,
1223 * <code>false</code> otherwise.
1224 */
1225 public boolean getSpeculativeExecution() {
1226 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1227 }
1228
1229 /**
1230 * Turn speculative execution on or off for this job.
1231 *
1232 * @param speculativeExecution <code>true</code> if speculative execution
1233 * should be turned on, else <code>false</code>.
1234 */
1235 public void setSpeculativeExecution(boolean speculativeExecution) {
1236 setMapSpeculativeExecution(speculativeExecution);
1237 setReduceSpeculativeExecution(speculativeExecution);
1238 }
1239
1240 /**
1241 * Should speculative execution be used for this job for map tasks?
1242 * Defaults to <code>true</code>.
1243 *
1244 * @return <code>true</code> if speculative execution be
1245 * used for this job for map tasks,
1246 * <code>false</code> otherwise.
1247 */
1248 public boolean getMapSpeculativeExecution() {
1249 return getBoolean(JobContext.MAP_SPECULATIVE, true);
1250 }
1251
1252 /**
1253 * Turn speculative execution on or off for this job for map tasks.
1254 *
1255 * @param speculativeExecution <code>true</code> if speculative execution
1256 * should be turned on for map tasks,
1257 * else <code>false</code>.
1258 */
1259 public void setMapSpeculativeExecution(boolean speculativeExecution) {
1260 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1261 }
1262
1263 /**
1264 * Should speculative execution be used for this job for reduce tasks?
1265 * Defaults to <code>true</code>.
1266 *
1267 * @return <code>true</code> if speculative execution be used
1268 * for reduce tasks for this job,
1269 * <code>false</code> otherwise.
1270 */
1271 public boolean getReduceSpeculativeExecution() {
1272 return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1273 }
1274
1275 /**
1276 * Turn speculative execution on or off for this job for reduce tasks.
1277 *
1278 * @param speculativeExecution <code>true</code> if speculative execution
1279 * should be turned on for reduce tasks,
1280 * else <code>false</code>.
1281 */
1282 public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1283 setBoolean(JobContext.REDUCE_SPECULATIVE,
1284 speculativeExecution);
1285 }
1286
1287 /**
1288 * Get configured the number of reduce tasks for this job.
1289 * Defaults to <code>1</code>.
1290 *
1291 * @return the number of reduce tasks for this job.
1292 */
1293 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1294
1295 /**
1296 * Set the number of map tasks for this job.
1297 *
1298 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual
1299 * number of spawned map tasks depends on the number of {@link InputSplit}s
1300 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1301 *
1302 * A custom {@link InputFormat} is typically used to accurately control
1303 * the number of map tasks for the job.</p>
1304 *
1305 * <h4 id="NoOfMaps">How many maps?</h4>
1306 *
1307 * <p>The number of maps is usually driven by the total size of the inputs
1308 * i.e. total number of blocks of the input files.</p>
1309 *
1310 * <p>The right level of parallelism for maps seems to be around 10-100 maps
1311 * per-node, although it has been set up to 300 or so for very cpu-light map
1312 * tasks. Task setup takes awhile, so it is best if the maps take at least a
1313 * minute to execute.</p>
1314 *
1315 * <p>The default behavior of file-based {@link InputFormat}s is to split the
1316 * input into <i>logical</i> {@link InputSplit}s based on the total size, in
1317 * bytes, of input files. However, the {@link FileSystem} blocksize of the
1318 * input files is treated as an upper bound for input splits. A lower bound
1319 * on the split size can be set via
1320 * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
1321 * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1322 *
1323 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB,
1324 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is
1325 * used to set it even higher.</p>
1326 *
1327 * @param n the number of map tasks for this job.
1328 * @see InputFormat#getSplits(JobConf, int)
1329 * @see FileInputFormat
1330 * @see FileSystem#getDefaultBlockSize()
1331 * @see FileStatus#getBlockSize()
1332 */
1333 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1334
1335 /**
1336 * Get configured the number of reduce tasks for this job. Defaults to
1337 * <code>1</code>.
1338 *
1339 * @return the number of reduce tasks for this job.
1340 */
1341 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1342
1343 /**
1344 * Set the requisite number of reduce tasks for this job.
1345 *
1346 * <h4 id="NoOfReduces">How many reduces?</h4>
1347 *
1348 * <p>The right number of reduces seems to be <code>0.95</code> or
1349 * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> *
1350 * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
1351 * mapreduce.tasktracker.reduce.tasks.maximum</a>).
1352 * </p>
1353 *
1354 * <p>With <code>0.95</code> all of the reduces can launch immediately and
1355 * start transfering map outputs as the maps finish. With <code>1.75</code>
1356 * the faster nodes will finish their first round of reduces and launch a
1357 * second wave of reduces doing a much better job of load balancing.</p>
1358 *
1359 * <p>Increasing the number of reduces increases the framework overhead, but
1360 * increases load balancing and lowers the cost of failures.</p>
1361 *
1362 * <p>The scaling factors above are slightly less than whole numbers to
1363 * reserve a few reduce slots in the framework for speculative-tasks, failures
1364 * etc.</p>
1365 *
1366 * <h4 id="ReducerNone">Reducer NONE</h4>
1367 *
1368 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1369 *
1370 * <p>In this case the output of the map-tasks directly go to distributed
1371 * file-system, to the path set by
1372 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the
1373 * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1374 *
1375 * @param n the number of reduce tasks for this job.
1376 */
1377 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1378
1379 /**
1380 * Get the configured number of maximum attempts that will be made to run a
1381 * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1382 * property. If this property is not already set, the default is 4 attempts.
1383 *
1384 * @return the max number of attempts per map task.
1385 */
1386 public int getMaxMapAttempts() {
1387 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1388 }
1389
1390 /**
1391 * Expert: Set the number of maximum attempts that will be made to run a
1392 * map task.
1393 *
1394 * @param n the number of attempts per map task.
1395 */
1396 public void setMaxMapAttempts(int n) {
1397 setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1398 }
1399
1400 /**
1401 * Get the configured number of maximum attempts that will be made to run a
1402 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1403 * property. If this property is not already set, the default is 4 attempts.
1404 *
1405 * @return the max number of attempts per reduce task.
1406 */
1407 public int getMaxReduceAttempts() {
1408 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1409 }
1410 /**
1411 * Expert: Set the number of maximum attempts that will be made to run a
1412 * reduce task.
1413 *
1414 * @param n the number of attempts per reduce task.
1415 */
1416 public void setMaxReduceAttempts(int n) {
1417 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1418 }
1419
1420 /**
1421 * Get the user-specified job name. This is only used to identify the
1422 * job to the user.
1423 *
1424 * @return the job's name, defaulting to "".
1425 */
1426 public String getJobName() {
1427 return get(JobContext.JOB_NAME, "");
1428 }
1429
1430 /**
1431 * Set the user-specified job name.
1432 *
1433 * @param name the job's new name.
1434 */
1435 public void setJobName(String name) {
1436 set(JobContext.JOB_NAME, name);
1437 }
1438
1439 /**
1440 * Get the user-specified session identifier. The default is the empty string.
1441 *
1442 * The session identifier is used to tag metric data that is reported to some
1443 * performance metrics system via the org.apache.hadoop.metrics API. The
1444 * session identifier is intended, in particular, for use by Hadoop-On-Demand
1445 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently.
1446 * HOD will set the session identifier by modifying the mapred-site.xml file
1447 * before starting the cluster.
1448 *
1449 * When not running under HOD, this identifer is expected to remain set to
1450 * the empty string.
1451 *
1452 * @return the session identifier, defaulting to "".
1453 */
1454 @Deprecated
1455 public String getSessionId() {
1456 return get("session.id", "");
1457 }
1458
1459 /**
1460 * Set the user-specified session identifier.
1461 *
1462 * @param sessionId the new session id.
1463 */
1464 @Deprecated
1465 public void setSessionId(String sessionId) {
1466 set("session.id", sessionId);
1467 }
1468
1469 /**
1470 * Set the maximum no. of failures of a given job per tasktracker.
1471 * If the no. of task failures exceeds <code>noFailures</code>, the
1472 * tasktracker is <i>blacklisted</i> for this job.
1473 *
1474 * @param noFailures maximum no. of failures of a given job per tasktracker.
1475 */
1476 public void setMaxTaskFailuresPerTracker(int noFailures) {
1477 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1478 }
1479
1480 /**
1481 * Expert: Get the maximum no. of failures of a given job per tasktracker.
1482 * If the no. of task failures exceeds this, the tasktracker is
1483 * <i>blacklisted</i> for this job.
1484 *
1485 * @return the maximum no. of failures of a given job per tasktracker.
1486 */
1487 public int getMaxTaskFailuresPerTracker() {
1488 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
1489 }
1490
1491 /**
1492 * Get the maximum percentage of map tasks that can fail without
1493 * the job being aborted.
1494 *
1495 * Each map task is executed a minimum of {@link #getMaxMapAttempts()}
1496 * attempts before being declared as <i>failed</i>.
1497 *
1498 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1499 * the job being declared as {@link JobStatus#FAILED}.
1500 *
1501 * @return the maximum percentage of map tasks that can fail without
1502 * the job being aborted.
1503 */
1504 public int getMaxMapTaskFailuresPercent() {
1505 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1506 }
1507
1508 /**
1509 * Expert: Set the maximum percentage of map tasks that can fail without the
1510 * job being aborted.
1511 *
1512 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts
1513 * before being declared as <i>failed</i>.
1514 *
1515 * @param percent the maximum percentage of map tasks that can fail without
1516 * the job being aborted.
1517 */
1518 public void setMaxMapTaskFailuresPercent(int percent) {
1519 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1520 }
1521
1522 /**
1523 * Get the maximum percentage of reduce tasks that can fail without
1524 * the job being aborted.
1525 *
1526 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
1527 * attempts before being declared as <i>failed</i>.
1528 *
1529 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results
1530 * in the job being declared as {@link JobStatus#FAILED}.
1531 *
1532 * @return the maximum percentage of reduce tasks that can fail without
1533 * the job being aborted.
1534 */
1535 public int getMaxReduceTaskFailuresPercent() {
1536 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1537 }
1538
1539 /**
1540 * Set the maximum percentage of reduce tasks that can fail without the job
1541 * being aborted.
1542 *
1543 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
1544 * attempts before being declared as <i>failed</i>.
1545 *
1546 * @param percent the maximum percentage of reduce tasks that can fail without
1547 * the job being aborted.
1548 */
1549 public void setMaxReduceTaskFailuresPercent(int percent) {
1550 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1551 }
1552
1553 /**
1554 * Set {@link JobPriority} for this job.
1555 *
1556 * @param prio the {@link JobPriority} for this job.
1557 */
1558 public void setJobPriority(JobPriority prio) {
1559 set(JobContext.PRIORITY, prio.toString());
1560 }
1561
1562 /**
1563 * Get the {@link JobPriority} for this job.
1564 *
1565 * @return the {@link JobPriority} for this job.
1566 */
1567 public JobPriority getJobPriority() {
1568 String prio = get(JobContext.PRIORITY);
1569 if(prio == null) {
1570 return JobPriority.NORMAL;
1571 }
1572
1573 return JobPriority.valueOf(prio);
1574 }
1575
1576 /**
1577 * Set JobSubmitHostName for this job.
1578 *
1579 * @param hostname the JobSubmitHostName for this job.
1580 */
1581 void setJobSubmitHostName(String hostname) {
1582 set(MRJobConfig.JOB_SUBMITHOST, hostname);
1583 }
1584
1585 /**
1586 * Get the JobSubmitHostName for this job.
1587 *
1588 * @return the JobSubmitHostName for this job.
1589 */
1590 String getJobSubmitHostName() {
1591 String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1592
1593 return hostname;
1594 }
1595
1596 /**
1597 * Set JobSubmitHostAddress for this job.
1598 *
1599 * @param hostadd the JobSubmitHostAddress for this job.
1600 */
1601 void setJobSubmitHostAddress(String hostadd) {
1602 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1603 }
1604
1605 /**
1606 * Get JobSubmitHostAddress for this job.
1607 *
1608 * @return JobSubmitHostAddress for this job.
1609 */
1610 String getJobSubmitHostAddress() {
1611 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1612
1613 return hostadd;
1614 }
1615
1616 /**
1617 * Get whether the task profiling is enabled.
1618 * @return true if some tasks will be profiled
1619 */
1620 public boolean getProfileEnabled() {
1621 return getBoolean(JobContext.TASK_PROFILE, false);
1622 }
1623
1624 /**
1625 * Set whether the system should collect profiler information for some of
1626 * the tasks in this job? The information is stored in the user log
1627 * directory.
1628 * @param newValue true means it should be gathered
1629 */
1630 public void setProfileEnabled(boolean newValue) {
1631 setBoolean(JobContext.TASK_PROFILE, newValue);
1632 }
1633
1634 /**
1635 * Get the profiler configuration arguments.
1636 *
1637 * The default value for this property is
1638 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1639 *
1640 * @return the parameters to pass to the task child to configure profiling
1641 */
1642 public String getProfileParams() {
1643 return get(JobContext.TASK_PROFILE_PARAMS,
1644 "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," +
1645 "verbose=n,file=%s");
1646 }
1647
1648 /**
1649 * Set the profiler configuration arguments. If the string contains a '%s' it
1650 * will be replaced with the name of the profiling output file when the task
1651 * runs.
1652 *
1653 * This value is passed to the task child JVM on the command line.
1654 *
1655 * @param value the configuration string
1656 */
1657 public void setProfileParams(String value) {
1658 set(JobContext.TASK_PROFILE_PARAMS, value);
1659 }
1660
1661 /**
1662 * Get the range of maps or reduces to profile.
1663 * @param isMap is the task a map?
1664 * @return the task ranges
1665 */
1666 public IntegerRanges getProfileTaskRange(boolean isMap) {
1667 return getRange((isMap ? JobContext.NUM_MAP_PROFILES :
1668 JobContext.NUM_REDUCE_PROFILES), "0-2");
1669 }
1670
1671 /**
1672 * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
1673 * must also be called.
1674 * @param newValue a set of integer ranges of the map ids
1675 */
1676 public void setProfileTaskRange(boolean isMap, String newValue) {
1677 // parse the value to make sure it is legal
1678 new Configuration.IntegerRanges(newValue);
1679 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES),
1680 newValue);
1681 }
1682
1683 /**
1684 * Set the debug script to run when the map tasks fail.
1685 *
1686 * <p>The debug script can aid debugging of failed map tasks. The script is
1687 * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1688 *
1689 * <p>The debug command, run on the node where the map failed, is:</p>
1690 * <p><pre><blockquote>
1691 * $script $stdout $stderr $syslog $jobconf.
1692 * </blockquote></pre></p>
1693 *
1694 * <p> The script file is distributed through {@link DistributedCache}
1695 * APIs. The script needs to be symlinked. </p>
1696 *
1697 * <p>Here is an example on how to submit a script
1698 * <p><blockquote><pre>
1699 * job.setMapDebugScript("./myscript");
1700 * DistributedCache.createSymlink(job);
1701 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1702 * </pre></blockquote></p>
1703 *
1704 * @param mDbgScript the script name
1705 */
1706 public void setMapDebugScript(String mDbgScript) {
1707 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1708 }
1709
1710 /**
1711 * Get the map task's debug script.
1712 *
1713 * @return the debug Script for the mapred job for failed map tasks.
1714 * @see #setMapDebugScript(String)
1715 */
1716 public String getMapDebugScript() {
1717 return get(JobContext.MAP_DEBUG_SCRIPT);
1718 }
1719
1720 /**
1721 * Set the debug script to run when the reduce tasks fail.
1722 *
1723 * <p>The debug script can aid debugging of failed reduce tasks. The script
1724 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1725 *
1726 * <p>The debug command, run on the node where the map failed, is:</p>
1727 * <p><pre><blockquote>
1728 * $script $stdout $stderr $syslog $jobconf.
1729 * </blockquote></pre></p>
1730 *
1731 * <p> The script file is distributed through {@link DistributedCache}
1732 * APIs. The script file needs to be symlinked </p>
1733 *
1734 * <p>Here is an example on how to submit a script
1735 * <p><blockquote><pre>
1736 * job.setReduceDebugScript("./myscript");
1737 * DistributedCache.createSymlink(job);
1738 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1739 * </pre></blockquote></p>
1740 *
1741 * @param rDbgScript the script name
1742 */
1743 public void setReduceDebugScript(String rDbgScript) {
1744 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1745 }
1746
1747 /**
1748 * Get the reduce task's debug Script
1749 *
1750 * @return the debug script for the mapred job for failed reduce tasks.
1751 * @see #setReduceDebugScript(String)
1752 */
1753 public String getReduceDebugScript() {
1754 return get(JobContext.REDUCE_DEBUG_SCRIPT);
1755 }
1756
1757 /**
1758 * Get the uri to be invoked in-order to send a notification after the job
1759 * has completed (success/failure).
1760 *
1761 * @return the job end notification uri, <code>null</code> if it hasn't
1762 * been set.
1763 * @see #setJobEndNotificationURI(String)
1764 */
1765 public String getJobEndNotificationURI() {
1766 return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1767 }
1768
1769 /**
1770 * Set the uri to be invoked in-order to send a notification after the job
1771 * has completed (success/failure).
1772 *
1773 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and
1774 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's
1775 * identifier and completion-status respectively.</p>
1776 *
1777 * <p>This is typically used by application-writers to implement chaining of
1778 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1779 *
1780 * @param uri the job end notification uri
1781 * @see JobStatus
1782 * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#
1783 * JobCompletionAndChaining">Job Completion and Chaining</a>
1784 */
1785 public void setJobEndNotificationURI(String uri) {
1786 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1787 }
1788
1789 /**
1790 * Get job-specific shared directory for use as scratch space
1791 *
1792 * <p>
1793 * When a job starts, a shared directory is created at location
1794 * <code>
1795 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1796 * This directory is exposed to the users through
1797 * <code>mapreduce.job.local.dir </code>.
1798 * So, the tasks can use this space
1799 * as scratch space and share files among them. </p>
1800 * This value is available as System property also.
1801 *
1802 * @return The localized job specific shared directory
1803 */
1804 public String getJobLocalDir() {
1805 return get(JobContext.JOB_LOCAL_DIR);
1806 }
1807
1808 /**
1809 * Get memory required to run a map task of the job, in MB.
1810 *
1811 * If a value is specified in the configuration, it is returned.
1812 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1813 * <p/>
1814 * For backward compatibility, if the job configuration sets the
1815 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1816 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1817 * after converting it from bytes to MB.
1818 * @return memory required to run a map task of the job, in MB,
1819 * or {@link #DISABLED_MEMORY_LIMIT} if unset.
1820 */
1821 public long getMemoryForMapTask() {
1822 long value = getDeprecatedMemoryValue();
1823 if (value == DISABLED_MEMORY_LIMIT) {
1824 value = normalizeMemoryConfigValue(
1825 getLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY,
1826 DISABLED_MEMORY_LIMIT));
1827 }
1828 // In case that M/R 1.x applications use the old property name
1829 if (value == DISABLED_MEMORY_LIMIT) {
1830 value = normalizeMemoryConfigValue(
1831 getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
1832 DISABLED_MEMORY_LIMIT));
1833 }
1834 return value;
1835 }
1836
1837 public void setMemoryForMapTask(long mem) {
1838 setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1839 // In case that M/R 1.x applications use the old property name
1840 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1841 }
1842
1843 /**
1844 * Get memory required to run a reduce task of the job, in MB.
1845 *
1846 * If a value is specified in the configuration, it is returned.
1847 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1848 * <p/>
1849 * For backward compatibility, if the job configuration sets the
1850 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1851 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1852 * after converting it from bytes to MB.
1853 * @return memory required to run a reduce task of the job, in MB,
1854 * or {@link #DISABLED_MEMORY_LIMIT} if unset.
1855 */
1856 public long getMemoryForReduceTask() {
1857 long value = getDeprecatedMemoryValue();
1858 if (value == DISABLED_MEMORY_LIMIT) {
1859 value = normalizeMemoryConfigValue(
1860 getLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY,
1861 DISABLED_MEMORY_LIMIT));
1862 }
1863 // In case that M/R 1.x applications use the old property name
1864 if (value == DISABLED_MEMORY_LIMIT) {
1865 value = normalizeMemoryConfigValue(
1866 getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
1867 DISABLED_MEMORY_LIMIT));
1868 }
1869 return value;
1870 }
1871
1872 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1873 // converted into MBs.
1874 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1875 // value.
1876 private long getDeprecatedMemoryValue() {
1877 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
1878 DISABLED_MEMORY_LIMIT);
1879 oldValue = normalizeMemoryConfigValue(oldValue);
1880 if (oldValue != DISABLED_MEMORY_LIMIT) {
1881 oldValue /= (1024*1024);
1882 }
1883 return oldValue;
1884 }
1885
1886 public void setMemoryForReduceTask(long mem) {
1887 setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1888 // In case that M/R 1.x applications use the old property name
1889 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1890 }
1891
1892 /**
1893 * Return the name of the queue to which this job is submitted.
1894 * Defaults to 'default'.
1895 *
1896 * @return name of the queue
1897 */
1898 public String getQueueName() {
1899 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1900 }
1901
1902 /**
1903 * Set the name of the queue to which this job should be submitted.
1904 *
1905 * @param queueName Name of the queue
1906 */
1907 public void setQueueName(String queueName) {
1908 set(JobContext.QUEUE_NAME, queueName);
1909 }
1910
1911 /**
1912 * Normalize the negative values in configuration
1913 *
1914 * @param val
1915 * @return normalized value
1916 */
1917 public static long normalizeMemoryConfigValue(long val) {
1918 if (val < 0) {
1919 val = DISABLED_MEMORY_LIMIT;
1920 }
1921 return val;
1922 }
1923
1924 /**
1925 * Compute the number of slots required to run a single map task-attempt
1926 * of this job.
1927 * @param slotSizePerMap cluster-wide value of the amount of memory required
1928 * to run a map-task
1929 * @return the number of slots required to run a single map task-attempt
1930 * 1 if memory parameters are disabled.
1931 */
1932 int computeNumSlotsPerMap(long slotSizePerMap) {
1933 if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) ||
1934 (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) {
1935 return 1;
1936 }
1937 return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap));
1938 }
1939
1940 /**
1941 * Compute the number of slots required to run a single reduce task-attempt
1942 * of this job.
1943 * @param slotSizePerReduce cluster-wide value of the amount of memory
1944 * required to run a reduce-task
1945 * @return the number of slots required to run a single reduce task-attempt
1946 * 1 if memory parameters are disabled
1947 */
1948 int computeNumSlotsPerReduce(long slotSizePerReduce) {
1949 if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) ||
1950 (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) {
1951 return 1;
1952 }
1953 return
1954 (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
1955 }
1956
1957 /**
1958 * Find a jar that contains a class of the same name, if any.
1959 * It will return a jar file, even if that is not the first thing
1960 * on the class path that has a class with the same name.
1961 *
1962 * @param my_class the class to find.
1963 * @return a jar file that contains the class, or null.
1964 * @throws IOException
1965 */
1966 public static String findContainingJar(Class my_class) {
1967 return ClassUtil.findContainingJar(my_class);
1968 }
1969
1970 /**
1971 * Get the memory required to run a task of this job, in bytes. See
1972 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1973 * <p/>
1974 * This method is deprecated. Now, different memory limits can be
1975 * set for map and reduce tasks of a job, in MB.
1976 * <p/>
1977 * For backward compatibility, if the job configuration sets the
1978 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1979 * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned.
1980 * Otherwise, this method will return the larger of the values returned by
1981 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
1982 * after converting them into bytes.
1983 *
1984 * @return Memory required to run a task of this job, in bytes,
1985 * or {@link #DISABLED_MEMORY_LIMIT}, if unset.
1986 * @see #setMaxVirtualMemoryForTask(long)
1987 * @deprecated Use {@link #getMemoryForMapTask()} and
1988 * {@link #getMemoryForReduceTask()}
1989 */
1990 @Deprecated
1991 public long getMaxVirtualMemoryForTask() {
1992 LOG.warn(
1993 "getMaxVirtualMemoryForTask() is deprecated. " +
1994 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
1995
1996 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
1997 value = normalizeMemoryConfigValue(value);
1998 if (value == DISABLED_MEMORY_LIMIT) {
1999 value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
2000 value = normalizeMemoryConfigValue(value);
2001 if (value != DISABLED_MEMORY_LIMIT) {
2002 value *= 1024*1024;
2003 }
2004 }
2005 return value;
2006 }
2007
2008 /**
2009 * Set the maximum amount of memory any task of this job can use. See
2010 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
2011 * <p/>
2012 * mapred.task.maxvmem is split into
2013 * mapreduce.map.memory.mb
2014 * and mapreduce.map.memory.mb,mapred
2015 * each of the new key are set
2016 * as mapred.task.maxvmem / 1024
2017 * as new values are in MB
2018 *
2019 * @param vmem Maximum amount of virtual memory in bytes any task of this job
2020 * can use.
2021 * @see #getMaxVirtualMemoryForTask()
2022 * @deprecated
2023 * Use {@link #setMemoryForMapTask(long mem)} and
2024 * Use {@link #setMemoryForReduceTask(long mem)}
2025 */
2026 @Deprecated
2027 public void setMaxVirtualMemoryForTask(long vmem) {
2028 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
2029 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
2030 if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) {
2031 setMemoryForMapTask(DISABLED_MEMORY_LIMIT);
2032 setMemoryForReduceTask(DISABLED_MEMORY_LIMIT);
2033 }
2034
2035 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
2036 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
2037 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
2038 }else{
2039 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
2040 }
2041 }
2042
2043 /**
2044 * @deprecated this variable is deprecated and nolonger in use.
2045 */
2046 @Deprecated
2047 public long getMaxPhysicalMemoryForTask() {
2048 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
2049 + " Refer to the APIs getMemoryForMapTask() and"
2050 + " getMemoryForReduceTask() for details.");
2051 return -1;
2052 }
2053
2054 /*
2055 * @deprecated this
2056 */
2057 @Deprecated
2058 public void setMaxPhysicalMemoryForTask(long mem) {
2059 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
2060 + " The value set is ignored. Refer to "
2061 + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
2062 }
2063
2064 static String deprecatedString(String key) {
2065 return "The variable " + key + " is no longer used.";
2066 }
2067
2068 private void checkAndWarnDeprecation() {
2069 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
2070 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
2071 + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY
2072 + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY);
2073 }
2074 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
2075 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
2076 }
2077 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
2078 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
2079 }
2080 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
2081 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
2082 }
2083 }
2084
2085
2086 }
2087