001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 022 import java.io.IOException; 023 import java.util.regex.Pattern; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceAudience.Private; 029 import org.apache.hadoop.classification.InterfaceStability; 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.fs.FileStatus; 032 import org.apache.hadoop.fs.FileSystem; 033 import org.apache.hadoop.fs.Path; 034 import org.apache.hadoop.io.LongWritable; 035 import org.apache.hadoop.io.RawComparator; 036 import org.apache.hadoop.io.Text; 037 import org.apache.hadoop.io.WritableComparable; 038 import org.apache.hadoop.io.WritableComparator; 039 import org.apache.hadoop.io.compress.CompressionCodec; 040 import org.apache.hadoop.mapred.lib.HashPartitioner; 041 import org.apache.hadoop.mapred.lib.IdentityMapper; 042 import org.apache.hadoop.mapred.lib.IdentityReducer; 043 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator; 044 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner; 045 import org.apache.hadoop.mapreduce.MRConfig; 046 import org.apache.hadoop.mapreduce.MRJobConfig; 047 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 048 import org.apache.hadoop.mapreduce.util.ConfigUtil; 049 import org.apache.hadoop.security.Credentials; 050 import org.apache.hadoop.util.ClassUtil; 051 import org.apache.hadoop.util.ReflectionUtils; 052 import org.apache.hadoop.util.Tool; 053 import org.apache.log4j.Level; 054 055 /** 056 * A map/reduce job configuration. 057 * 058 * <p><code>JobConf</code> is the primary interface for a user to describe a 059 * map-reduce job to the Hadoop framework for execution. The framework tries to 060 * faithfully execute the job as-is described by <code>JobConf</code>, however: 061 * <ol> 062 * <li> 063 * Some configuration parameters might have been marked as 064 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams"> 065 * final</a> by administrators and hence cannot be altered. 066 * </li> 067 * <li> 068 * While some job parameters are straight-forward to set 069 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 070 * with the rest of the framework and/or job-configuration and is relatively 071 * more complex for the user to control finely 072 * (e.g. {@link #setNumMapTasks(int)}). 073 * </li> 074 * </ol></p> 075 * 076 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 077 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 078 * {@link OutputFormat} implementations to be used etc. 079 * 080 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 081 * of the job such as <code>Comparator</code>s to be used, files to be put in 082 * the {@link DistributedCache}, whether or not intermediate and/or job outputs 083 * are to be compressed (and how), debugability via user-provided scripts 084 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}), 085 * for doing post-processing on task logs, task's stdout, stderr, syslog. 086 * and etc.</p> 087 * 088 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p> 089 * <p><blockquote><pre> 090 * // Create a new JobConf 091 * JobConf job = new JobConf(new Configuration(), MyJob.class); 092 * 093 * // Specify various job-specific parameters 094 * job.setJobName("myjob"); 095 * 096 * FileInputFormat.setInputPaths(job, new Path("in")); 097 * FileOutputFormat.setOutputPath(job, new Path("out")); 098 * 099 * job.setMapperClass(MyJob.MyMapper.class); 100 * job.setCombinerClass(MyJob.MyReducer.class); 101 * job.setReducerClass(MyJob.MyReducer.class); 102 * 103 * job.setInputFormat(SequenceFileInputFormat.class); 104 * job.setOutputFormat(SequenceFileOutputFormat.class); 105 * </pre></blockquote></p> 106 * 107 * @see JobClient 108 * @see ClusterStatus 109 * @see Tool 110 * @see DistributedCache 111 */ 112 @InterfaceAudience.Public 113 @InterfaceStability.Stable 114 public class JobConf extends Configuration { 115 116 private static final Log LOG = LogFactory.getLog(JobConf.class); 117 118 static{ 119 ConfigUtil.loadResources(); 120 } 121 122 /** 123 * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and 124 * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY} 125 */ 126 @Deprecated 127 public static final String MAPRED_TASK_MAXVMEM_PROPERTY = 128 "mapred.task.maxvmem"; 129 130 /** 131 * @deprecated 132 */ 133 @Deprecated 134 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY = 135 "mapred.task.limit.maxvmem"; 136 137 /** 138 * @deprecated 139 */ 140 @Deprecated 141 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY = 142 "mapred.task.default.maxvmem"; 143 144 /** 145 * @deprecated 146 */ 147 @Deprecated 148 public static final String MAPRED_TASK_MAXPMEM_PROPERTY = 149 "mapred.task.maxpmem"; 150 151 /** 152 * A value which if set for memory related configuration options, 153 * indicates that the options are turned off. 154 * Deprecated because it makes no sense in the context of MR2. 155 */ 156 @Deprecated 157 public static final long DISABLED_MEMORY_LIMIT = -1L; 158 159 /** 160 * Property name for the configuration property mapreduce.cluster.local.dir 161 */ 162 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR; 163 164 /** 165 * Name of the queue to which jobs will be submitted, if no queue 166 * name is mentioned. 167 */ 168 public static final String DEFAULT_QUEUE_NAME = "default"; 169 170 static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY = 171 JobContext.MAP_MEMORY_MB; 172 173 static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY = 174 JobContext.REDUCE_MEMORY_MB; 175 176 /** 177 * The variable is kept for M/R 1.x applications, while M/R 2.x applications 178 * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} 179 */ 180 @Deprecated 181 public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY = 182 "mapred.job.map.memory.mb"; 183 184 /** 185 * The variable is kept for M/R 1.x applications, while M/R 2.x applications 186 * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY} 187 */ 188 @Deprecated 189 public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY = 190 "mapred.job.reduce.memory.mb"; 191 192 /** Pattern for the default unpacking behavior for job jars */ 193 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT = 194 Pattern.compile("(?:classes/|lib/).*"); 195 196 /** 197 * Configuration key to set the java command line options for the child 198 * map and reduce tasks. 199 * 200 * Java opts for the task tracker child processes. 201 * The following symbol, if present, will be interpolated: @taskid@. 202 * It is replaced by current TaskID. Any other occurrences of '@' will go 203 * unchanged. 204 * For example, to enable verbose gc logging to a file named for the taskid in 205 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 206 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 207 * 208 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 209 * other environment variables to the child processes. 210 * 211 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 212 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS} 213 */ 214 @Deprecated 215 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts"; 216 217 /** 218 * Configuration key to set the java command line options for the map tasks. 219 * 220 * Java opts for the task tracker child map processes. 221 * The following symbol, if present, will be interpolated: @taskid@. 222 * It is replaced by current TaskID. Any other occurrences of '@' will go 223 * unchanged. 224 * For example, to enable verbose gc logging to a file named for the taskid in 225 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 226 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 227 * 228 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 229 * other environment variables to the map processes. 230 */ 231 public static final String MAPRED_MAP_TASK_JAVA_OPTS = 232 JobContext.MAP_JAVA_OPTS; 233 234 /** 235 * Configuration key to set the java command line options for the reduce tasks. 236 * 237 * Java opts for the task tracker child reduce processes. 238 * The following symbol, if present, will be interpolated: @taskid@. 239 * It is replaced by current TaskID. Any other occurrences of '@' will go 240 * unchanged. 241 * For example, to enable verbose gc logging to a file named for the taskid in 242 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 243 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 244 * 245 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 246 * pass process environment variables to the reduce processes. 247 */ 248 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 249 JobContext.REDUCE_JAVA_OPTS; 250 251 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m"; 252 253 /** 254 * @deprecated 255 * Configuration key to set the maximum virtual memory available to the child 256 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no 257 * longer have any effect. 258 */ 259 @Deprecated 260 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit"; 261 262 /** 263 * @deprecated 264 * Configuration key to set the maximum virtual memory available to the 265 * map tasks (in kilo-bytes). This has been deprecated and will no 266 * longer have any effect. 267 */ 268 @Deprecated 269 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit"; 270 271 /** 272 * @deprecated 273 * Configuration key to set the maximum virtual memory available to the 274 * reduce tasks (in kilo-bytes). This has been deprecated and will no 275 * longer have any effect. 276 */ 277 @Deprecated 278 public static final String MAPRED_REDUCE_TASK_ULIMIT = 279 "mapreduce.reduce.ulimit"; 280 281 282 /** 283 * Configuration key to set the environment of the child map/reduce tasks. 284 * 285 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 286 * reference existing environment variables via <code>$key</code> on 287 * Linux or <code>%key%</code> on Windows. 288 * 289 * Example: 290 * <ul> 291 * <li> A=foo - This will set the env variable A to foo. </li> 292 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 293 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 294 * </ul> 295 * 296 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 297 * {@link #MAPRED_REDUCE_TASK_ENV} 298 */ 299 @Deprecated 300 public static final String MAPRED_TASK_ENV = "mapred.child.env"; 301 302 /** 303 * Configuration key to set the environment of the child map tasks. 304 * 305 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 306 * reference existing environment variables via <code>$key</code> on 307 * Linux or <code>%key%</code> on Windows. 308 * 309 * Example: 310 * <ul> 311 * <li> A=foo - This will set the env variable A to foo. </li> 312 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 313 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 314 * </ul> 315 */ 316 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV; 317 318 /** 319 * Configuration key to set the environment of the child reduce tasks. 320 * 321 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 322 * reference existing environment variables via <code>$key</code> on 323 * Linux or <code>%key%</code> on Windows. 324 * 325 * Example: 326 * <ul> 327 * <li> A=foo - This will set the env variable A to foo. </li> 328 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 329 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 330 * </ul> 331 */ 332 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV; 333 334 private Credentials credentials = new Credentials(); 335 336 /** 337 * Configuration key to set the logging {@link Level} for the map task. 338 * 339 * The allowed logging levels are: 340 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 341 */ 342 public static final String MAPRED_MAP_TASK_LOG_LEVEL = 343 JobContext.MAP_LOG_LEVEL; 344 345 /** 346 * Configuration key to set the logging {@link Level} for the reduce task. 347 * 348 * The allowed logging levels are: 349 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 350 */ 351 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 352 JobContext.REDUCE_LOG_LEVEL; 353 354 /** 355 * Default logging level for map/reduce tasks. 356 */ 357 public static final Level DEFAULT_LOG_LEVEL = Level.INFO; 358 359 /** 360 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 361 * use {@link MRJobConfig#WORKFLOW_ID} instead 362 */ 363 @Deprecated 364 public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID; 365 366 /** 367 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 368 * use {@link MRJobConfig#WORKFLOW_NAME} instead 369 */ 370 @Deprecated 371 public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME; 372 373 /** 374 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 375 * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead 376 */ 377 @Deprecated 378 public static final String WORKFLOW_NODE_NAME = 379 MRJobConfig.WORKFLOW_NODE_NAME; 380 381 /** 382 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 383 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead 384 */ 385 @Deprecated 386 public static final String WORKFLOW_ADJACENCY_PREFIX_STRING = 387 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING; 388 389 /** 390 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 391 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead 392 */ 393 @Deprecated 394 public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN = 395 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN; 396 397 /** 398 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 399 * use {@link MRJobConfig#WORKFLOW_TAGS} instead 400 */ 401 @Deprecated 402 public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS; 403 404 /** 405 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 406 * not use it 407 */ 408 @Deprecated 409 public static final String MAPREDUCE_RECOVER_JOB = 410 "mapreduce.job.restart.recover"; 411 412 /** 413 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 414 * not use it 415 */ 416 @Deprecated 417 public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true; 418 419 /** 420 * Construct a map/reduce job configuration. 421 */ 422 public JobConf() { 423 checkAndWarnDeprecation(); 424 } 425 426 /** 427 * Construct a map/reduce job configuration. 428 * 429 * @param exampleClass a class whose containing jar is used as the job's jar. 430 */ 431 public JobConf(Class exampleClass) { 432 setJarByClass(exampleClass); 433 checkAndWarnDeprecation(); 434 } 435 436 /** 437 * Construct a map/reduce job configuration. 438 * 439 * @param conf a Configuration whose settings will be inherited. 440 */ 441 public JobConf(Configuration conf) { 442 super(conf); 443 444 if (conf instanceof JobConf) { 445 JobConf that = (JobConf)conf; 446 credentials = that.credentials; 447 } 448 449 checkAndWarnDeprecation(); 450 } 451 452 453 /** Construct a map/reduce job configuration. 454 * 455 * @param conf a Configuration whose settings will be inherited. 456 * @param exampleClass a class whose containing jar is used as the job's jar. 457 */ 458 public JobConf(Configuration conf, Class exampleClass) { 459 this(conf); 460 setJarByClass(exampleClass); 461 } 462 463 464 /** Construct a map/reduce configuration. 465 * 466 * @param config a Configuration-format XML job description file. 467 */ 468 public JobConf(String config) { 469 this(new Path(config)); 470 } 471 472 /** Construct a map/reduce configuration. 473 * 474 * @param config a Configuration-format XML job description file. 475 */ 476 public JobConf(Path config) { 477 super(); 478 addResource(config); 479 checkAndWarnDeprecation(); 480 } 481 482 /** A new map/reduce configuration where the behavior of reading from the 483 * default resources can be turned off. 484 * <p/> 485 * If the parameter {@code loadDefaults} is false, the new instance 486 * will not load resources from the default files. 487 * 488 * @param loadDefaults specifies whether to load from the default files 489 */ 490 public JobConf(boolean loadDefaults) { 491 super(loadDefaults); 492 checkAndWarnDeprecation(); 493 } 494 495 /** 496 * Get credentials for the job. 497 * @return credentials for the job 498 */ 499 public Credentials getCredentials() { 500 return credentials; 501 } 502 503 @Private 504 public void setCredentials(Credentials credentials) { 505 this.credentials = credentials; 506 } 507 508 /** 509 * Get the user jar for the map-reduce job. 510 * 511 * @return the user jar for the map-reduce job. 512 */ 513 public String getJar() { return get(JobContext.JAR); } 514 515 /** 516 * Set the user jar for the map-reduce job. 517 * 518 * @param jar the user jar for the map-reduce job. 519 */ 520 public void setJar(String jar) { set(JobContext.JAR, jar); } 521 522 /** 523 * Get the pattern for jar contents to unpack on the tasktracker 524 */ 525 public Pattern getJarUnpackPattern() { 526 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT); 527 } 528 529 530 /** 531 * Set the job's jar file by finding an example class location. 532 * 533 * @param cls the example class. 534 */ 535 public void setJarByClass(Class cls) { 536 String jar = ClassUtil.findContainingJar(cls); 537 if (jar != null) { 538 setJar(jar); 539 } 540 } 541 542 public String[] getLocalDirs() throws IOException { 543 return getTrimmedStrings(MRConfig.LOCAL_DIR); 544 } 545 546 /** 547 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead. 548 */ 549 @Deprecated 550 public void deleteLocalFiles() throws IOException { 551 String[] localDirs = getLocalDirs(); 552 for (int i = 0; i < localDirs.length; i++) { 553 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true); 554 } 555 } 556 557 public void deleteLocalFiles(String subdir) throws IOException { 558 String[] localDirs = getLocalDirs(); 559 for (int i = 0; i < localDirs.length; i++) { 560 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true); 561 } 562 } 563 564 /** 565 * Constructs a local file name. Files are distributed among configured 566 * local directories. 567 */ 568 public Path getLocalPath(String pathString) throws IOException { 569 return getLocalPath(MRConfig.LOCAL_DIR, pathString); 570 } 571 572 /** 573 * Get the reported username for this job. 574 * 575 * @return the username 576 */ 577 public String getUser() { 578 return get(JobContext.USER_NAME); 579 } 580 581 /** 582 * Set the reported username for this job. 583 * 584 * @param user the username for this job. 585 */ 586 public void setUser(String user) { 587 set(JobContext.USER_NAME, user); 588 } 589 590 591 592 /** 593 * Set whether the framework should keep the intermediate files for 594 * failed tasks. 595 * 596 * @param keep <code>true</code> if framework should keep the intermediate files 597 * for failed tasks, <code>false</code> otherwise. 598 * 599 */ 600 public void setKeepFailedTaskFiles(boolean keep) { 601 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep); 602 } 603 604 /** 605 * Should the temporary files for failed tasks be kept? 606 * 607 * @return should the files be kept? 608 */ 609 public boolean getKeepFailedTaskFiles() { 610 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false); 611 } 612 613 /** 614 * Set a regular expression for task names that should be kept. 615 * The regular expression ".*_m_000123_0" would keep the files 616 * for the first instance of map 123 that ran. 617 * 618 * @param pattern the java.util.regex.Pattern to match against the 619 * task names. 620 */ 621 public void setKeepTaskFilesPattern(String pattern) { 622 set(JobContext.PRESERVE_FILES_PATTERN, pattern); 623 } 624 625 /** 626 * Get the regular expression that is matched against the task names 627 * to see if we need to keep the files. 628 * 629 * @return the pattern as a string, if it was set, othewise null. 630 */ 631 public String getKeepTaskFilesPattern() { 632 return get(JobContext.PRESERVE_FILES_PATTERN); 633 } 634 635 /** 636 * Set the current working directory for the default file system. 637 * 638 * @param dir the new current working directory. 639 */ 640 public void setWorkingDirectory(Path dir) { 641 dir = new Path(getWorkingDirectory(), dir); 642 set(JobContext.WORKING_DIR, dir.toString()); 643 } 644 645 /** 646 * Get the current working directory for the default file system. 647 * 648 * @return the directory name. 649 */ 650 public Path getWorkingDirectory() { 651 String name = get(JobContext.WORKING_DIR); 652 if (name != null) { 653 return new Path(name); 654 } else { 655 try { 656 Path dir = FileSystem.get(this).getWorkingDirectory(); 657 set(JobContext.WORKING_DIR, dir.toString()); 658 return dir; 659 } catch (IOException e) { 660 throw new RuntimeException(e); 661 } 662 } 663 } 664 665 /** 666 * Sets the number of tasks that a spawned task JVM should run 667 * before it exits 668 * @param numTasks the number of tasks to execute; defaults to 1; 669 * -1 signifies no limit 670 */ 671 public void setNumTasksToExecutePerJvm(int numTasks) { 672 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks); 673 } 674 675 /** 676 * Get the number of tasks that a spawned JVM should execute 677 */ 678 public int getNumTasksToExecutePerJvm() { 679 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1); 680 } 681 682 /** 683 * Get the {@link InputFormat} implementation for the map-reduce job, 684 * defaults to {@link TextInputFormat} if not specified explicity. 685 * 686 * @return the {@link InputFormat} implementation for the map-reduce job. 687 */ 688 public InputFormat getInputFormat() { 689 return ReflectionUtils.newInstance(getClass("mapred.input.format.class", 690 TextInputFormat.class, 691 InputFormat.class), 692 this); 693 } 694 695 /** 696 * Set the {@link InputFormat} implementation for the map-reduce job. 697 * 698 * @param theClass the {@link InputFormat} implementation for the map-reduce 699 * job. 700 */ 701 public void setInputFormat(Class<? extends InputFormat> theClass) { 702 setClass("mapred.input.format.class", theClass, InputFormat.class); 703 } 704 705 /** 706 * Get the {@link OutputFormat} implementation for the map-reduce job, 707 * defaults to {@link TextOutputFormat} if not specified explicity. 708 * 709 * @return the {@link OutputFormat} implementation for the map-reduce job. 710 */ 711 public OutputFormat getOutputFormat() { 712 return ReflectionUtils.newInstance(getClass("mapred.output.format.class", 713 TextOutputFormat.class, 714 OutputFormat.class), 715 this); 716 } 717 718 /** 719 * Get the {@link OutputCommitter} implementation for the map-reduce job, 720 * defaults to {@link FileOutputCommitter} if not specified explicitly. 721 * 722 * @return the {@link OutputCommitter} implementation for the map-reduce job. 723 */ 724 public OutputCommitter getOutputCommitter() { 725 return (OutputCommitter)ReflectionUtils.newInstance( 726 getClass("mapred.output.committer.class", FileOutputCommitter.class, 727 OutputCommitter.class), this); 728 } 729 730 /** 731 * Set the {@link OutputCommitter} implementation for the map-reduce job. 732 * 733 * @param theClass the {@link OutputCommitter} implementation for the map-reduce 734 * job. 735 */ 736 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) { 737 setClass("mapred.output.committer.class", theClass, OutputCommitter.class); 738 } 739 740 /** 741 * Set the {@link OutputFormat} implementation for the map-reduce job. 742 * 743 * @param theClass the {@link OutputFormat} implementation for the map-reduce 744 * job. 745 */ 746 public void setOutputFormat(Class<? extends OutputFormat> theClass) { 747 setClass("mapred.output.format.class", theClass, OutputFormat.class); 748 } 749 750 /** 751 * Should the map outputs be compressed before transfer? 752 * 753 * @param compress should the map outputs be compressed? 754 */ 755 public void setCompressMapOutput(boolean compress) { 756 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress); 757 } 758 759 /** 760 * Are the outputs of the maps be compressed? 761 * 762 * @return <code>true</code> if the outputs of the maps are to be compressed, 763 * <code>false</code> otherwise. 764 */ 765 public boolean getCompressMapOutput() { 766 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false); 767 } 768 769 /** 770 * Set the given class as the {@link CompressionCodec} for the map outputs. 771 * 772 * @param codecClass the {@link CompressionCodec} class that will compress 773 * the map outputs. 774 */ 775 public void 776 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) { 777 setCompressMapOutput(true); 778 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 779 CompressionCodec.class); 780 } 781 782 /** 783 * Get the {@link CompressionCodec} for compressing the map outputs. 784 * 785 * @param defaultValue the {@link CompressionCodec} to return if not set 786 * @return the {@link CompressionCodec} class that should be used to compress the 787 * map outputs. 788 * @throws IllegalArgumentException if the class was specified, but not found 789 */ 790 public Class<? extends CompressionCodec> 791 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) { 792 Class<? extends CompressionCodec> codecClass = defaultValue; 793 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC); 794 if (name != null) { 795 try { 796 codecClass = getClassByName(name).asSubclass(CompressionCodec.class); 797 } catch (ClassNotFoundException e) { 798 throw new IllegalArgumentException("Compression codec " + name + 799 " was not found.", e); 800 } 801 } 802 return codecClass; 803 } 804 805 /** 806 * Get the key class for the map output data. If it is not set, use the 807 * (final) output key class. This allows the map output key class to be 808 * different than the final output key class. 809 * 810 * @return the map output key class. 811 */ 812 public Class<?> getMapOutputKeyClass() { 813 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class); 814 if (retv == null) { 815 retv = getOutputKeyClass(); 816 } 817 return retv; 818 } 819 820 /** 821 * Set the key class for the map output data. This allows the user to 822 * specify the map output key class to be different than the final output 823 * value class. 824 * 825 * @param theClass the map output key class. 826 */ 827 public void setMapOutputKeyClass(Class<?> theClass) { 828 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class); 829 } 830 831 /** 832 * Get the value class for the map output data. If it is not set, use the 833 * (final) output value class This allows the map output value class to be 834 * different than the final output value class. 835 * 836 * @return the map output value class. 837 */ 838 public Class<?> getMapOutputValueClass() { 839 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null, 840 Object.class); 841 if (retv == null) { 842 retv = getOutputValueClass(); 843 } 844 return retv; 845 } 846 847 /** 848 * Set the value class for the map output data. This allows the user to 849 * specify the map output value class to be different than the final output 850 * value class. 851 * 852 * @param theClass the map output value class. 853 */ 854 public void setMapOutputValueClass(Class<?> theClass) { 855 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class); 856 } 857 858 /** 859 * Get the key class for the job output data. 860 * 861 * @return the key class for the job output data. 862 */ 863 public Class<?> getOutputKeyClass() { 864 return getClass(JobContext.OUTPUT_KEY_CLASS, 865 LongWritable.class, Object.class); 866 } 867 868 /** 869 * Set the key class for the job output data. 870 * 871 * @param theClass the key class for the job output data. 872 */ 873 public void setOutputKeyClass(Class<?> theClass) { 874 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class); 875 } 876 877 /** 878 * Get the {@link RawComparator} comparator used to compare keys. 879 * 880 * @return the {@link RawComparator} comparator used to compare keys. 881 */ 882 public RawComparator getOutputKeyComparator() { 883 Class<? extends RawComparator> theClass = getClass( 884 JobContext.KEY_COMPARATOR, null, RawComparator.class); 885 if (theClass != null) 886 return ReflectionUtils.newInstance(theClass, this); 887 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this); 888 } 889 890 /** 891 * Set the {@link RawComparator} comparator used to compare keys. 892 * 893 * @param theClass the {@link RawComparator} comparator used to 894 * compare keys. 895 * @see #setOutputValueGroupingComparator(Class) 896 */ 897 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) { 898 setClass(JobContext.KEY_COMPARATOR, 899 theClass, RawComparator.class); 900 } 901 902 /** 903 * Set the {@link KeyFieldBasedComparator} options used to compare keys. 904 * 905 * @param keySpec the key specification of the form -k pos1[,pos2], where, 906 * pos is of the form f[.c][opts], where f is the number 907 * of the key field to use, and c is the number of the first character from 908 * the beginning of the field. Fields and character posns are numbered 909 * starting with 1; a character position of zero in pos2 indicates the 910 * field's last character. If '.c' is omitted from pos1, it defaults to 1 911 * (the beginning of the field); if omitted from pos2, it defaults to 0 912 * (the end of the field). opts are ordering options. The supported options 913 * are: 914 * -n, (Sort numerically) 915 * -r, (Reverse the result of comparison) 916 */ 917 public void setKeyFieldComparatorOptions(String keySpec) { 918 setOutputKeyComparatorClass(KeyFieldBasedComparator.class); 919 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec); 920 } 921 922 /** 923 * Get the {@link KeyFieldBasedComparator} options 924 */ 925 public String getKeyFieldComparatorOption() { 926 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS); 927 } 928 929 /** 930 * Set the {@link KeyFieldBasedPartitioner} options used for 931 * {@link Partitioner} 932 * 933 * @param keySpec the key specification of the form -k pos1[,pos2], where, 934 * pos is of the form f[.c][opts], where f is the number 935 * of the key field to use, and c is the number of the first character from 936 * the beginning of the field. Fields and character posns are numbered 937 * starting with 1; a character position of zero in pos2 indicates the 938 * field's last character. If '.c' is omitted from pos1, it defaults to 1 939 * (the beginning of the field); if omitted from pos2, it defaults to 0 940 * (the end of the field). 941 */ 942 public void setKeyFieldPartitionerOptions(String keySpec) { 943 setPartitionerClass(KeyFieldBasedPartitioner.class); 944 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec); 945 } 946 947 /** 948 * Get the {@link KeyFieldBasedPartitioner} options 949 */ 950 public String getKeyFieldPartitionerOption() { 951 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS); 952 } 953 954 /** 955 * Get the user defined {@link WritableComparable} comparator for 956 * grouping keys of inputs to the combiner. 957 * 958 * @return comparator set by the user for grouping values. 959 * @see #setCombinerKeyGroupingComparator(Class) for details. 960 */ 961 public RawComparator getCombinerKeyGroupingComparator() { 962 Class<? extends RawComparator> theClass = getClass( 963 JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class); 964 if (theClass == null) { 965 return getOutputKeyComparator(); 966 } 967 968 return ReflectionUtils.newInstance(theClass, this); 969 } 970 971 /** 972 * Get the user defined {@link WritableComparable} comparator for 973 * grouping keys of inputs to the reduce. 974 * 975 * @return comparator set by the user for grouping values. 976 * @see #setOutputValueGroupingComparator(Class) for details. 977 */ 978 public RawComparator getOutputValueGroupingComparator() { 979 Class<? extends RawComparator> theClass = getClass( 980 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class); 981 if (theClass == null) { 982 return getOutputKeyComparator(); 983 } 984 985 return ReflectionUtils.newInstance(theClass, this); 986 } 987 988 /** 989 * Set the user defined {@link RawComparator} comparator for 990 * grouping keys in the input to the combiner. 991 * <p/> 992 * <p>This comparator should be provided if the equivalence rules for keys 993 * for sorting the intermediates are different from those for grouping keys 994 * before each call to 995 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 996 * <p/> 997 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 998 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 999 * <p/> 1000 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 1001 * how keys are sorted, this can be used in conjunction to simulate 1002 * <i>secondary sort on values</i>.</p> 1003 * <p/> 1004 * <p><i>Note</i>: This is not a guarantee of the combiner sort being 1005 * <i>stable</i> in any sense. (In any case, with the order of available 1006 * map-outputs to the combiner being non-deterministic, it wouldn't make 1007 * that much sense.)</p> 1008 * 1009 * @param theClass the comparator class to be used for grouping keys for the 1010 * combiner. It should implement <code>RawComparator</code>. 1011 * @see #setOutputKeyComparatorClass(Class) 1012 */ 1013 public void setCombinerKeyGroupingComparator( 1014 Class<? extends RawComparator> theClass) { 1015 setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS, 1016 theClass, RawComparator.class); 1017 } 1018 1019 /** 1020 * Set the user defined {@link RawComparator} comparator for 1021 * grouping keys in the input to the reduce. 1022 * 1023 * <p>This comparator should be provided if the equivalence rules for keys 1024 * for sorting the intermediates are different from those for grouping keys 1025 * before each call to 1026 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 1027 * 1028 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 1029 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 1030 * 1031 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 1032 * how keys are sorted, this can be used in conjunction to simulate 1033 * <i>secondary sort on values</i>.</p> 1034 * 1035 * <p><i>Note</i>: This is not a guarantee of the reduce sort being 1036 * <i>stable</i> in any sense. (In any case, with the order of available 1037 * map-outputs to the reduce being non-deterministic, it wouldn't make 1038 * that much sense.)</p> 1039 * 1040 * @param theClass the comparator class to be used for grouping keys. 1041 * It should implement <code>RawComparator</code>. 1042 * @see #setOutputKeyComparatorClass(Class) 1043 * @see #setCombinerKeyGroupingComparator(Class) 1044 */ 1045 public void setOutputValueGroupingComparator( 1046 Class<? extends RawComparator> theClass) { 1047 setClass(JobContext.GROUP_COMPARATOR_CLASS, 1048 theClass, RawComparator.class); 1049 } 1050 1051 /** 1052 * Should the framework use the new context-object code for running 1053 * the mapper? 1054 * @return true, if the new api should be used 1055 */ 1056 public boolean getUseNewMapper() { 1057 return getBoolean("mapred.mapper.new-api", false); 1058 } 1059 /** 1060 * Set whether the framework should use the new api for the mapper. 1061 * This is the default for jobs submitted with the new Job api. 1062 * @param flag true, if the new api should be used 1063 */ 1064 public void setUseNewMapper(boolean flag) { 1065 setBoolean("mapred.mapper.new-api", flag); 1066 } 1067 1068 /** 1069 * Should the framework use the new context-object code for running 1070 * the reducer? 1071 * @return true, if the new api should be used 1072 */ 1073 public boolean getUseNewReducer() { 1074 return getBoolean("mapred.reducer.new-api", false); 1075 } 1076 /** 1077 * Set whether the framework should use the new api for the reducer. 1078 * This is the default for jobs submitted with the new Job api. 1079 * @param flag true, if the new api should be used 1080 */ 1081 public void setUseNewReducer(boolean flag) { 1082 setBoolean("mapred.reducer.new-api", flag); 1083 } 1084 1085 /** 1086 * Get the value class for job outputs. 1087 * 1088 * @return the value class for job outputs. 1089 */ 1090 public Class<?> getOutputValueClass() { 1091 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class); 1092 } 1093 1094 /** 1095 * Set the value class for job outputs. 1096 * 1097 * @param theClass the value class for job outputs. 1098 */ 1099 public void setOutputValueClass(Class<?> theClass) { 1100 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class); 1101 } 1102 1103 /** 1104 * Get the {@link Mapper} class for the job. 1105 * 1106 * @return the {@link Mapper} class for the job. 1107 */ 1108 public Class<? extends Mapper> getMapperClass() { 1109 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class); 1110 } 1111 1112 /** 1113 * Set the {@link Mapper} class for the job. 1114 * 1115 * @param theClass the {@link Mapper} class for the job. 1116 */ 1117 public void setMapperClass(Class<? extends Mapper> theClass) { 1118 setClass("mapred.mapper.class", theClass, Mapper.class); 1119 } 1120 1121 /** 1122 * Get the {@link MapRunnable} class for the job. 1123 * 1124 * @return the {@link MapRunnable} class for the job. 1125 */ 1126 public Class<? extends MapRunnable> getMapRunnerClass() { 1127 return getClass("mapred.map.runner.class", 1128 MapRunner.class, MapRunnable.class); 1129 } 1130 1131 /** 1132 * Expert: Set the {@link MapRunnable} class for the job. 1133 * 1134 * Typically used to exert greater control on {@link Mapper}s. 1135 * 1136 * @param theClass the {@link MapRunnable} class for the job. 1137 */ 1138 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) { 1139 setClass("mapred.map.runner.class", theClass, MapRunnable.class); 1140 } 1141 1142 /** 1143 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 1144 * to be sent to the {@link Reducer}s. 1145 * 1146 * @return the {@link Partitioner} used to partition map-outputs. 1147 */ 1148 public Class<? extends Partitioner> getPartitionerClass() { 1149 return getClass("mapred.partitioner.class", 1150 HashPartitioner.class, Partitioner.class); 1151 } 1152 1153 /** 1154 * Set the {@link Partitioner} class used to partition 1155 * {@link Mapper}-outputs to be sent to the {@link Reducer}s. 1156 * 1157 * @param theClass the {@link Partitioner} used to partition map-outputs. 1158 */ 1159 public void setPartitionerClass(Class<? extends Partitioner> theClass) { 1160 setClass("mapred.partitioner.class", theClass, Partitioner.class); 1161 } 1162 1163 /** 1164 * Get the {@link Reducer} class for the job. 1165 * 1166 * @return the {@link Reducer} class for the job. 1167 */ 1168 public Class<? extends Reducer> getReducerClass() { 1169 return getClass("mapred.reducer.class", 1170 IdentityReducer.class, Reducer.class); 1171 } 1172 1173 /** 1174 * Set the {@link Reducer} class for the job. 1175 * 1176 * @param theClass the {@link Reducer} class for the job. 1177 */ 1178 public void setReducerClass(Class<? extends Reducer> theClass) { 1179 setClass("mapred.reducer.class", theClass, Reducer.class); 1180 } 1181 1182 /** 1183 * Get the user-defined <i>combiner</i> class used to combine map-outputs 1184 * before being sent to the reducers. Typically the combiner is same as the 1185 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}. 1186 * 1187 * @return the user-defined combiner class used to combine map-outputs. 1188 */ 1189 public Class<? extends Reducer> getCombinerClass() { 1190 return getClass("mapred.combiner.class", null, Reducer.class); 1191 } 1192 1193 /** 1194 * Set the user-defined <i>combiner</i> class used to combine map-outputs 1195 * before being sent to the reducers. 1196 * 1197 * <p>The combiner is an application-specified aggregation operation, which 1198 * can help cut down the amount of data transferred between the 1199 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p> 1200 * 1201 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both 1202 * the mapper and reducer tasks. In general, the combiner is called as the 1203 * sort/merge result is written to disk. The combiner must: 1204 * <ul> 1205 * <li> be side-effect free</li> 1206 * <li> have the same input and output key types and the same input and 1207 * output value types</li> 1208 * </ul></p> 1209 * 1210 * <p>Typically the combiner is same as the <code>Reducer</code> for the 1211 * job i.e. {@link #setReducerClass(Class)}.</p> 1212 * 1213 * @param theClass the user-defined combiner class used to combine 1214 * map-outputs. 1215 */ 1216 public void setCombinerClass(Class<? extends Reducer> theClass) { 1217 setClass("mapred.combiner.class", theClass, Reducer.class); 1218 } 1219 1220 /** 1221 * Should speculative execution be used for this job? 1222 * Defaults to <code>true</code>. 1223 * 1224 * @return <code>true</code> if speculative execution be used for this job, 1225 * <code>false</code> otherwise. 1226 */ 1227 public boolean getSpeculativeExecution() { 1228 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution()); 1229 } 1230 1231 /** 1232 * Turn speculative execution on or off for this job. 1233 * 1234 * @param speculativeExecution <code>true</code> if speculative execution 1235 * should be turned on, else <code>false</code>. 1236 */ 1237 public void setSpeculativeExecution(boolean speculativeExecution) { 1238 setMapSpeculativeExecution(speculativeExecution); 1239 setReduceSpeculativeExecution(speculativeExecution); 1240 } 1241 1242 /** 1243 * Should speculative execution be used for this job for map tasks? 1244 * Defaults to <code>true</code>. 1245 * 1246 * @return <code>true</code> if speculative execution be 1247 * used for this job for map tasks, 1248 * <code>false</code> otherwise. 1249 */ 1250 public boolean getMapSpeculativeExecution() { 1251 return getBoolean(JobContext.MAP_SPECULATIVE, true); 1252 } 1253 1254 /** 1255 * Turn speculative execution on or off for this job for map tasks. 1256 * 1257 * @param speculativeExecution <code>true</code> if speculative execution 1258 * should be turned on for map tasks, 1259 * else <code>false</code>. 1260 */ 1261 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1262 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution); 1263 } 1264 1265 /** 1266 * Should speculative execution be used for this job for reduce tasks? 1267 * Defaults to <code>true</code>. 1268 * 1269 * @return <code>true</code> if speculative execution be used 1270 * for reduce tasks for this job, 1271 * <code>false</code> otherwise. 1272 */ 1273 public boolean getReduceSpeculativeExecution() { 1274 return getBoolean(JobContext.REDUCE_SPECULATIVE, true); 1275 } 1276 1277 /** 1278 * Turn speculative execution on or off for this job for reduce tasks. 1279 * 1280 * @param speculativeExecution <code>true</code> if speculative execution 1281 * should be turned on for reduce tasks, 1282 * else <code>false</code>. 1283 */ 1284 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1285 setBoolean(JobContext.REDUCE_SPECULATIVE, 1286 speculativeExecution); 1287 } 1288 1289 /** 1290 * Get configured the number of reduce tasks for this job. 1291 * Defaults to <code>1</code>. 1292 * 1293 * @return the number of reduce tasks for this job. 1294 */ 1295 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); } 1296 1297 /** 1298 * Set the number of map tasks for this job. 1299 * 1300 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 1301 * number of spawned map tasks depends on the number of {@link InputSplit}s 1302 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}. 1303 * 1304 * A custom {@link InputFormat} is typically used to accurately control 1305 * the number of map tasks for the job.</p> 1306 * 1307 * <h4 id="NoOfMaps">How many maps?</h4> 1308 * 1309 * <p>The number of maps is usually driven by the total size of the inputs 1310 * i.e. total number of blocks of the input files.</p> 1311 * 1312 * <p>The right level of parallelism for maps seems to be around 10-100 maps 1313 * per-node, although it has been set up to 300 or so for very cpu-light map 1314 * tasks. Task setup takes awhile, so it is best if the maps take at least a 1315 * minute to execute.</p> 1316 * 1317 * <p>The default behavior of file-based {@link InputFormat}s is to split the 1318 * input into <i>logical</i> {@link InputSplit}s based on the total size, in 1319 * bytes, of input files. However, the {@link FileSystem} blocksize of the 1320 * input files is treated as an upper bound for input splits. A lower bound 1321 * on the split size can be set via 1322 * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize"> 1323 * mapreduce.input.fileinputformat.split.minsize</a>.</p> 1324 * 1325 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 1326 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 1327 * used to set it even higher.</p> 1328 * 1329 * @param n the number of map tasks for this job. 1330 * @see InputFormat#getSplits(JobConf, int) 1331 * @see FileInputFormat 1332 * @see FileSystem#getDefaultBlockSize() 1333 * @see FileStatus#getBlockSize() 1334 */ 1335 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); } 1336 1337 /** 1338 * Get configured the number of reduce tasks for this job. Defaults to 1339 * <code>1</code>. 1340 * 1341 * @return the number of reduce tasks for this job. 1342 */ 1343 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); } 1344 1345 /** 1346 * Set the requisite number of reduce tasks for this job. 1347 * 1348 * <h4 id="NoOfReduces">How many reduces?</h4> 1349 * 1350 * <p>The right number of reduces seems to be <code>0.95</code> or 1351 * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> * 1352 * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum"> 1353 * mapreduce.tasktracker.reduce.tasks.maximum</a>). 1354 * </p> 1355 * 1356 * <p>With <code>0.95</code> all of the reduces can launch immediately and 1357 * start transfering map outputs as the maps finish. With <code>1.75</code> 1358 * the faster nodes will finish their first round of reduces and launch a 1359 * second wave of reduces doing a much better job of load balancing.</p> 1360 * 1361 * <p>Increasing the number of reduces increases the framework overhead, but 1362 * increases load balancing and lowers the cost of failures.</p> 1363 * 1364 * <p>The scaling factors above are slightly less than whole numbers to 1365 * reserve a few reduce slots in the framework for speculative-tasks, failures 1366 * etc.</p> 1367 * 1368 * <h4 id="ReducerNone">Reducer NONE</h4> 1369 * 1370 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p> 1371 * 1372 * <p>In this case the output of the map-tasks directly go to distributed 1373 * file-system, to the path set by 1374 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 1375 * framework doesn't sort the map-outputs before writing it out to HDFS.</p> 1376 * 1377 * @param n the number of reduce tasks for this job. 1378 */ 1379 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); } 1380 1381 /** 1382 * Get the configured number of maximum attempts that will be made to run a 1383 * map task, as specified by the <code>mapreduce.map.maxattempts</code> 1384 * property. If this property is not already set, the default is 4 attempts. 1385 * 1386 * @return the max number of attempts per map task. 1387 */ 1388 public int getMaxMapAttempts() { 1389 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4); 1390 } 1391 1392 /** 1393 * Expert: Set the number of maximum attempts that will be made to run a 1394 * map task. 1395 * 1396 * @param n the number of attempts per map task. 1397 */ 1398 public void setMaxMapAttempts(int n) { 1399 setInt(JobContext.MAP_MAX_ATTEMPTS, n); 1400 } 1401 1402 /** 1403 * Get the configured number of maximum attempts that will be made to run a 1404 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code> 1405 * property. If this property is not already set, the default is 4 attempts. 1406 * 1407 * @return the max number of attempts per reduce task. 1408 */ 1409 public int getMaxReduceAttempts() { 1410 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4); 1411 } 1412 /** 1413 * Expert: Set the number of maximum attempts that will be made to run a 1414 * reduce task. 1415 * 1416 * @param n the number of attempts per reduce task. 1417 */ 1418 public void setMaxReduceAttempts(int n) { 1419 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n); 1420 } 1421 1422 /** 1423 * Get the user-specified job name. This is only used to identify the 1424 * job to the user. 1425 * 1426 * @return the job's name, defaulting to "". 1427 */ 1428 public String getJobName() { 1429 return get(JobContext.JOB_NAME, ""); 1430 } 1431 1432 /** 1433 * Set the user-specified job name. 1434 * 1435 * @param name the job's new name. 1436 */ 1437 public void setJobName(String name) { 1438 set(JobContext.JOB_NAME, name); 1439 } 1440 1441 /** 1442 * Get the user-specified session identifier. The default is the empty string. 1443 * 1444 * The session identifier is used to tag metric data that is reported to some 1445 * performance metrics system via the org.apache.hadoop.metrics API. The 1446 * session identifier is intended, in particular, for use by Hadoop-On-Demand 1447 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 1448 * HOD will set the session identifier by modifying the mapred-site.xml file 1449 * before starting the cluster. 1450 * 1451 * When not running under HOD, this identifer is expected to remain set to 1452 * the empty string. 1453 * 1454 * @return the session identifier, defaulting to "". 1455 */ 1456 @Deprecated 1457 public String getSessionId() { 1458 return get("session.id", ""); 1459 } 1460 1461 /** 1462 * Set the user-specified session identifier. 1463 * 1464 * @param sessionId the new session id. 1465 */ 1466 @Deprecated 1467 public void setSessionId(String sessionId) { 1468 set("session.id", sessionId); 1469 } 1470 1471 /** 1472 * Set the maximum no. of failures of a given job per tasktracker. 1473 * If the no. of task failures exceeds <code>noFailures</code>, the 1474 * tasktracker is <i>blacklisted</i> for this job. 1475 * 1476 * @param noFailures maximum no. of failures of a given job per tasktracker. 1477 */ 1478 public void setMaxTaskFailuresPerTracker(int noFailures) { 1479 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures); 1480 } 1481 1482 /** 1483 * Expert: Get the maximum no. of failures of a given job per tasktracker. 1484 * If the no. of task failures exceeds this, the tasktracker is 1485 * <i>blacklisted</i> for this job. 1486 * 1487 * @return the maximum no. of failures of a given job per tasktracker. 1488 */ 1489 public int getMaxTaskFailuresPerTracker() { 1490 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3); 1491 } 1492 1493 /** 1494 * Get the maximum percentage of map tasks that can fail without 1495 * the job being aborted. 1496 * 1497 * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 1498 * attempts before being declared as <i>failed</i>. 1499 * 1500 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in 1501 * the job being declared as {@link JobStatus#FAILED}. 1502 * 1503 * @return the maximum percentage of map tasks that can fail without 1504 * the job being aborted. 1505 */ 1506 public int getMaxMapTaskFailuresPercent() { 1507 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0); 1508 } 1509 1510 /** 1511 * Expert: Set the maximum percentage of map tasks that can fail without the 1512 * job being aborted. 1513 * 1514 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 1515 * before being declared as <i>failed</i>. 1516 * 1517 * @param percent the maximum percentage of map tasks that can fail without 1518 * the job being aborted. 1519 */ 1520 public void setMaxMapTaskFailuresPercent(int percent) { 1521 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent); 1522 } 1523 1524 /** 1525 * Get the maximum percentage of reduce tasks that can fail without 1526 * the job being aborted. 1527 * 1528 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1529 * attempts before being declared as <i>failed</i>. 1530 * 1531 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 1532 * in the job being declared as {@link JobStatus#FAILED}. 1533 * 1534 * @return the maximum percentage of reduce tasks that can fail without 1535 * the job being aborted. 1536 */ 1537 public int getMaxReduceTaskFailuresPercent() { 1538 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0); 1539 } 1540 1541 /** 1542 * Set the maximum percentage of reduce tasks that can fail without the job 1543 * being aborted. 1544 * 1545 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1546 * attempts before being declared as <i>failed</i>. 1547 * 1548 * @param percent the maximum percentage of reduce tasks that can fail without 1549 * the job being aborted. 1550 */ 1551 public void setMaxReduceTaskFailuresPercent(int percent) { 1552 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent); 1553 } 1554 1555 /** 1556 * Set {@link JobPriority} for this job. 1557 * 1558 * @param prio the {@link JobPriority} for this job. 1559 */ 1560 public void setJobPriority(JobPriority prio) { 1561 set(JobContext.PRIORITY, prio.toString()); 1562 } 1563 1564 /** 1565 * Get the {@link JobPriority} for this job. 1566 * 1567 * @return the {@link JobPriority} for this job. 1568 */ 1569 public JobPriority getJobPriority() { 1570 String prio = get(JobContext.PRIORITY); 1571 if(prio == null) { 1572 return JobPriority.NORMAL; 1573 } 1574 1575 return JobPriority.valueOf(prio); 1576 } 1577 1578 /** 1579 * Set JobSubmitHostName for this job. 1580 * 1581 * @param hostname the JobSubmitHostName for this job. 1582 */ 1583 void setJobSubmitHostName(String hostname) { 1584 set(MRJobConfig.JOB_SUBMITHOST, hostname); 1585 } 1586 1587 /** 1588 * Get the JobSubmitHostName for this job. 1589 * 1590 * @return the JobSubmitHostName for this job. 1591 */ 1592 String getJobSubmitHostName() { 1593 String hostname = get(MRJobConfig.JOB_SUBMITHOST); 1594 1595 return hostname; 1596 } 1597 1598 /** 1599 * Set JobSubmitHostAddress for this job. 1600 * 1601 * @param hostadd the JobSubmitHostAddress for this job. 1602 */ 1603 void setJobSubmitHostAddress(String hostadd) { 1604 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd); 1605 } 1606 1607 /** 1608 * Get JobSubmitHostAddress for this job. 1609 * 1610 * @return JobSubmitHostAddress for this job. 1611 */ 1612 String getJobSubmitHostAddress() { 1613 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR); 1614 1615 return hostadd; 1616 } 1617 1618 /** 1619 * Get whether the task profiling is enabled. 1620 * @return true if some tasks will be profiled 1621 */ 1622 public boolean getProfileEnabled() { 1623 return getBoolean(JobContext.TASK_PROFILE, false); 1624 } 1625 1626 /** 1627 * Set whether the system should collect profiler information for some of 1628 * the tasks in this job? The information is stored in the user log 1629 * directory. 1630 * @param newValue true means it should be gathered 1631 */ 1632 public void setProfileEnabled(boolean newValue) { 1633 setBoolean(JobContext.TASK_PROFILE, newValue); 1634 } 1635 1636 /** 1637 * Get the profiler configuration arguments. 1638 * 1639 * The default value for this property is 1640 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s" 1641 * 1642 * @return the parameters to pass to the task child to configure profiling 1643 */ 1644 public String getProfileParams() { 1645 return get(JobContext.TASK_PROFILE_PARAMS, 1646 MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS); 1647 } 1648 1649 /** 1650 * Set the profiler configuration arguments. If the string contains a '%s' it 1651 * will be replaced with the name of the profiling output file when the task 1652 * runs. 1653 * 1654 * This value is passed to the task child JVM on the command line. 1655 * 1656 * @param value the configuration string 1657 */ 1658 public void setProfileParams(String value) { 1659 set(JobContext.TASK_PROFILE_PARAMS, value); 1660 } 1661 1662 /** 1663 * Get the range of maps or reduces to profile. 1664 * @param isMap is the task a map? 1665 * @return the task ranges 1666 */ 1667 public IntegerRanges getProfileTaskRange(boolean isMap) { 1668 return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 1669 JobContext.NUM_REDUCE_PROFILES), "0-2"); 1670 } 1671 1672 /** 1673 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1674 * must also be called. 1675 * @param newValue a set of integer ranges of the map ids 1676 */ 1677 public void setProfileTaskRange(boolean isMap, String newValue) { 1678 // parse the value to make sure it is legal 1679 new Configuration.IntegerRanges(newValue); 1680 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 1681 newValue); 1682 } 1683 1684 /** 1685 * Set the debug script to run when the map tasks fail. 1686 * 1687 * <p>The debug script can aid debugging of failed map tasks. The script is 1688 * given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1689 * 1690 * <p>The debug command, run on the node where the map failed, is:</p> 1691 * <p><pre><blockquote> 1692 * $script $stdout $stderr $syslog $jobconf. 1693 * </blockquote></pre></p> 1694 * 1695 * <p> The script file is distributed through {@link DistributedCache} 1696 * APIs. The script needs to be symlinked. </p> 1697 * 1698 * <p>Here is an example on how to submit a script 1699 * <p><blockquote><pre> 1700 * job.setMapDebugScript("./myscript"); 1701 * DistributedCache.createSymlink(job); 1702 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1703 * </pre></blockquote></p> 1704 * 1705 * @param mDbgScript the script name 1706 */ 1707 public void setMapDebugScript(String mDbgScript) { 1708 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript); 1709 } 1710 1711 /** 1712 * Get the map task's debug script. 1713 * 1714 * @return the debug Script for the mapred job for failed map tasks. 1715 * @see #setMapDebugScript(String) 1716 */ 1717 public String getMapDebugScript() { 1718 return get(JobContext.MAP_DEBUG_SCRIPT); 1719 } 1720 1721 /** 1722 * Set the debug script to run when the reduce tasks fail. 1723 * 1724 * <p>The debug script can aid debugging of failed reduce tasks. The script 1725 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1726 * 1727 * <p>The debug command, run on the node where the map failed, is:</p> 1728 * <p><pre><blockquote> 1729 * $script $stdout $stderr $syslog $jobconf. 1730 * </blockquote></pre></p> 1731 * 1732 * <p> The script file is distributed through {@link DistributedCache} 1733 * APIs. The script file needs to be symlinked </p> 1734 * 1735 * <p>Here is an example on how to submit a script 1736 * <p><blockquote><pre> 1737 * job.setReduceDebugScript("./myscript"); 1738 * DistributedCache.createSymlink(job); 1739 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1740 * </pre></blockquote></p> 1741 * 1742 * @param rDbgScript the script name 1743 */ 1744 public void setReduceDebugScript(String rDbgScript) { 1745 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript); 1746 } 1747 1748 /** 1749 * Get the reduce task's debug Script 1750 * 1751 * @return the debug script for the mapred job for failed reduce tasks. 1752 * @see #setReduceDebugScript(String) 1753 */ 1754 public String getReduceDebugScript() { 1755 return get(JobContext.REDUCE_DEBUG_SCRIPT); 1756 } 1757 1758 /** 1759 * Get the uri to be invoked in-order to send a notification after the job 1760 * has completed (success/failure). 1761 * 1762 * @return the job end notification uri, <code>null</code> if it hasn't 1763 * been set. 1764 * @see #setJobEndNotificationURI(String) 1765 */ 1766 public String getJobEndNotificationURI() { 1767 return get(JobContext.MR_JOB_END_NOTIFICATION_URL); 1768 } 1769 1770 /** 1771 * Set the uri to be invoked in-order to send a notification after the job 1772 * has completed (success/failure). 1773 * 1774 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 1775 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 1776 * identifier and completion-status respectively.</p> 1777 * 1778 * <p>This is typically used by application-writers to implement chaining of 1779 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p> 1780 * 1781 * @param uri the job end notification uri 1782 * @see JobStatus 1783 * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html# 1784 * JobCompletionAndChaining">Job Completion and Chaining</a> 1785 */ 1786 public void setJobEndNotificationURI(String uri) { 1787 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri); 1788 } 1789 1790 /** 1791 * Get job-specific shared directory for use as scratch space 1792 * 1793 * <p> 1794 * When a job starts, a shared directory is created at location 1795 * <code> 1796 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>. 1797 * This directory is exposed to the users through 1798 * <code>mapreduce.job.local.dir </code>. 1799 * So, the tasks can use this space 1800 * as scratch space and share files among them. </p> 1801 * This value is available as System property also. 1802 * 1803 * @return The localized job specific shared directory 1804 */ 1805 public String getJobLocalDir() { 1806 return get(JobContext.JOB_LOCAL_DIR); 1807 } 1808 1809 /** 1810 * Get memory required to run a map task of the job, in MB. 1811 * 1812 * If a value is specified in the configuration, it is returned. 1813 * Else, it returns {@link JobContext#DEFAULT_MAP_MEMORY_MB}. 1814 * <p/> 1815 * For backward compatibility, if the job configuration sets the 1816 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1817 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1818 * after converting it from bytes to MB. 1819 * @return memory required to run a map task of the job, in MB, 1820 */ 1821 public long getMemoryForMapTask() { 1822 long value = getDeprecatedMemoryValue(); 1823 if (value < 0) { 1824 return getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, 1825 JobContext.DEFAULT_MAP_MEMORY_MB); 1826 } 1827 return value; 1828 } 1829 1830 public void setMemoryForMapTask(long mem) { 1831 setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1832 // In case that M/R 1.x applications use the old property name 1833 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1834 } 1835 1836 /** 1837 * Get memory required to run a reduce task of the job, in MB. 1838 * 1839 * If a value is specified in the configuration, it is returned. 1840 * Else, it returns {@link JobContext#DEFAULT_REDUCE_MEMORY_MB}. 1841 * <p/> 1842 * For backward compatibility, if the job configuration sets the 1843 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1844 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1845 * after converting it from bytes to MB. 1846 * @return memory required to run a reduce task of the job, in MB. 1847 */ 1848 public long getMemoryForReduceTask() { 1849 long value = getDeprecatedMemoryValue(); 1850 if (value < 0) { 1851 return getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, 1852 JobContext.DEFAULT_REDUCE_MEMORY_MB); 1853 } 1854 return value; 1855 } 1856 1857 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY, 1858 // converted into MBs. 1859 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative 1860 // value. 1861 private long getDeprecatedMemoryValue() { 1862 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 1863 DISABLED_MEMORY_LIMIT); 1864 if (oldValue > 0) { 1865 oldValue /= (1024*1024); 1866 } 1867 return oldValue; 1868 } 1869 1870 public void setMemoryForReduceTask(long mem) { 1871 setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1872 // In case that M/R 1.x applications use the old property name 1873 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1874 } 1875 1876 /** 1877 * Return the name of the queue to which this job is submitted. 1878 * Defaults to 'default'. 1879 * 1880 * @return name of the queue 1881 */ 1882 public String getQueueName() { 1883 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME); 1884 } 1885 1886 /** 1887 * Set the name of the queue to which this job should be submitted. 1888 * 1889 * @param queueName Name of the queue 1890 */ 1891 public void setQueueName(String queueName) { 1892 set(JobContext.QUEUE_NAME, queueName); 1893 } 1894 1895 /** 1896 * Normalize the negative values in configuration 1897 * 1898 * @param val 1899 * @return normalized value 1900 */ 1901 public static long normalizeMemoryConfigValue(long val) { 1902 if (val < 0) { 1903 val = DISABLED_MEMORY_LIMIT; 1904 } 1905 return val; 1906 } 1907 1908 /** 1909 * Find a jar that contains a class of the same name, if any. 1910 * It will return a jar file, even if that is not the first thing 1911 * on the class path that has a class with the same name. 1912 * 1913 * @param my_class the class to find. 1914 * @return a jar file that contains the class, or null. 1915 * @throws IOException 1916 */ 1917 public static String findContainingJar(Class my_class) { 1918 return ClassUtil.findContainingJar(my_class); 1919 } 1920 1921 /** 1922 * Get the memory required to run a task of this job, in bytes. See 1923 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 1924 * <p/> 1925 * This method is deprecated. Now, different memory limits can be 1926 * set for map and reduce tasks of a job, in MB. 1927 * <p/> 1928 * For backward compatibility, if the job configuration sets the 1929 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY}, that value is returned. 1930 * Otherwise, this method will return the larger of the values returned by 1931 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()} 1932 * after converting them into bytes. 1933 * 1934 * @return Memory required to run a task of this job, in bytes. 1935 * @see #setMaxVirtualMemoryForTask(long) 1936 * @deprecated Use {@link #getMemoryForMapTask()} and 1937 * {@link #getMemoryForReduceTask()} 1938 */ 1939 @Deprecated 1940 public long getMaxVirtualMemoryForTask() { 1941 LOG.warn( 1942 "getMaxVirtualMemoryForTask() is deprecated. " + 1943 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()"); 1944 1945 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 1946 Math.max(getMemoryForMapTask(), getMemoryForReduceTask()) * 1024 * 1024); 1947 return value; 1948 } 1949 1950 /** 1951 * Set the maximum amount of memory any task of this job can use. See 1952 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 1953 * <p/> 1954 * mapred.task.maxvmem is split into 1955 * mapreduce.map.memory.mb 1956 * and mapreduce.map.memory.mb,mapred 1957 * each of the new key are set 1958 * as mapred.task.maxvmem / 1024 1959 * as new values are in MB 1960 * 1961 * @param vmem Maximum amount of virtual memory in bytes any task of this job 1962 * can use. 1963 * @see #getMaxVirtualMemoryForTask() 1964 * @deprecated 1965 * Use {@link #setMemoryForMapTask(long mem)} and 1966 * Use {@link #setMemoryForReduceTask(long mem)} 1967 */ 1968 @Deprecated 1969 public void setMaxVirtualMemoryForTask(long vmem) { 1970 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+ 1971 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()"); 1972 if (vmem < 0) { 1973 throw new IllegalArgumentException("Task memory allocation may not be < 0"); 1974 } 1975 1976 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) { 1977 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb 1978 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb 1979 }else{ 1980 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem); 1981 } 1982 } 1983 1984 /** 1985 * @deprecated this variable is deprecated and nolonger in use. 1986 */ 1987 @Deprecated 1988 public long getMaxPhysicalMemoryForTask() { 1989 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated." 1990 + " Refer to the APIs getMemoryForMapTask() and" 1991 + " getMemoryForReduceTask() for details."); 1992 return -1; 1993 } 1994 1995 /* 1996 * @deprecated this 1997 */ 1998 @Deprecated 1999 public void setMaxPhysicalMemoryForTask(long mem) { 2000 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated." 2001 + " The value set is ignored. Refer to " 2002 + " setMemoryForMapTask() and setMemoryForReduceTask() for details."); 2003 } 2004 2005 static String deprecatedString(String key) { 2006 return "The variable " + key + " is no longer used."; 2007 } 2008 2009 private void checkAndWarnDeprecation() { 2010 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) { 2011 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) 2012 + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY 2013 + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY); 2014 } 2015 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) { 2016 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT)); 2017 } 2018 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) { 2019 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT)); 2020 } 2021 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) { 2022 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT)); 2023 } 2024 } 2025 2026 2027 } 2028