001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021 022import java.io.IOException; 023import java.util.regex.Pattern; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceAudience.Private; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.LongWritable; 035import org.apache.hadoop.io.RawComparator; 036import org.apache.hadoop.io.Text; 037import org.apache.hadoop.io.WritableComparable; 038import org.apache.hadoop.io.WritableComparator; 039import org.apache.hadoop.io.compress.CompressionCodec; 040import org.apache.hadoop.mapred.lib.HashPartitioner; 041import org.apache.hadoop.mapred.lib.IdentityMapper; 042import org.apache.hadoop.mapred.lib.IdentityReducer; 043import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator; 044import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner; 045import org.apache.hadoop.mapreduce.MRConfig; 046import org.apache.hadoop.mapreduce.MRJobConfig; 047import org.apache.hadoop.mapreduce.filecache.DistributedCache; 048import org.apache.hadoop.mapreduce.util.ConfigUtil; 049import org.apache.hadoop.security.Credentials; 050import org.apache.hadoop.util.ClassUtil; 051import org.apache.hadoop.util.ReflectionUtils; 052import org.apache.hadoop.util.Tool; 053import org.apache.log4j.Level; 054 055/** 056 * A map/reduce job configuration. 057 * 058 * <p><code>JobConf</code> is the primary interface for a user to describe a 059 * map-reduce job to the Hadoop framework for execution. The framework tries to 060 * faithfully execute the job as-is described by <code>JobConf</code>, however: 061 * <ol> 062 * <li> 063 * Some configuration parameters might have been marked as 064 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams"> 065 * final</a> by administrators and hence cannot be altered. 066 * </li> 067 * <li> 068 * While some job parameters are straight-forward to set 069 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 070 * with the rest of the framework and/or job-configuration and is relatively 071 * more complex for the user to control finely 072 * (e.g. {@link #setNumMapTasks(int)}). 073 * </li> 074 * </ol> 075 * 076 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 077 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 078 * {@link OutputFormat} implementations to be used etc. 079 * 080 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 081 * of the job such as <code>Comparator</code>s to be used, files to be put in 082 * the {@link DistributedCache}, whether or not intermediate and/or job outputs 083 * are to be compressed (and how), debugability via user-provided scripts 084 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}), 085 * for doing post-processing on task logs, task's stdout, stderr, syslog. 086 * and etc.</p> 087 * 088 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p> 089 * <p><blockquote><pre> 090 * // Create a new JobConf 091 * JobConf job = new JobConf(new Configuration(), MyJob.class); 092 * 093 * // Specify various job-specific parameters 094 * job.setJobName("myjob"); 095 * 096 * FileInputFormat.setInputPaths(job, new Path("in")); 097 * FileOutputFormat.setOutputPath(job, new Path("out")); 098 * 099 * job.setMapperClass(MyJob.MyMapper.class); 100 * job.setCombinerClass(MyJob.MyReducer.class); 101 * job.setReducerClass(MyJob.MyReducer.class); 102 * 103 * job.setInputFormat(SequenceFileInputFormat.class); 104 * job.setOutputFormat(SequenceFileOutputFormat.class); 105 * </pre></blockquote> 106 * 107 * @see JobClient 108 * @see ClusterStatus 109 * @see Tool 110 * @see DistributedCache 111 */ 112@InterfaceAudience.Public 113@InterfaceStability.Stable 114public class JobConf extends Configuration { 115 116 private static final Log LOG = LogFactory.getLog(JobConf.class); 117 118 static{ 119 ConfigUtil.loadResources(); 120 } 121 122 /** 123 * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and 124 * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY} 125 */ 126 @Deprecated 127 public static final String MAPRED_TASK_MAXVMEM_PROPERTY = 128 "mapred.task.maxvmem"; 129 130 /** 131 * @deprecated 132 */ 133 @Deprecated 134 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY = 135 "mapred.task.limit.maxvmem"; 136 137 /** 138 * @deprecated 139 */ 140 @Deprecated 141 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY = 142 "mapred.task.default.maxvmem"; 143 144 /** 145 * @deprecated 146 */ 147 @Deprecated 148 public static final String MAPRED_TASK_MAXPMEM_PROPERTY = 149 "mapred.task.maxpmem"; 150 151 /** 152 * A value which if set for memory related configuration options, 153 * indicates that the options are turned off. 154 * Deprecated because it makes no sense in the context of MR2. 155 */ 156 @Deprecated 157 public static final long DISABLED_MEMORY_LIMIT = -1L; 158 159 /** 160 * Property name for the configuration property mapreduce.cluster.local.dir 161 */ 162 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR; 163 164 /** 165 * Name of the queue to which jobs will be submitted, if no queue 166 * name is mentioned. 167 */ 168 public static final String DEFAULT_QUEUE_NAME = "default"; 169 170 static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY = 171 JobContext.MAP_MEMORY_MB; 172 173 static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY = 174 JobContext.REDUCE_MEMORY_MB; 175 176 /** 177 * The variable is kept for M/R 1.x applications, while M/R 2.x applications 178 * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} 179 */ 180 @Deprecated 181 public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY = 182 "mapred.job.map.memory.mb"; 183 184 /** 185 * The variable is kept for M/R 1.x applications, while M/R 2.x applications 186 * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY} 187 */ 188 @Deprecated 189 public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY = 190 "mapred.job.reduce.memory.mb"; 191 192 /** Pattern for the default unpacking behavior for job jars */ 193 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT = 194 Pattern.compile("(?:classes/|lib/).*"); 195 196 /** 197 * Configuration key to set the java command line options for the child 198 * map and reduce tasks. 199 * 200 * Java opts for the task tracker child processes. 201 * The following symbol, if present, will be interpolated: @taskid@. 202 * It is replaced by current TaskID. Any other occurrences of '@' will go 203 * unchanged. 204 * For example, to enable verbose gc logging to a file named for the taskid in 205 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 206 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 207 * 208 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 209 * other environment variables to the child processes. 210 * 211 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 212 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS} 213 */ 214 @Deprecated 215 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts"; 216 217 /** 218 * Configuration key to set the java command line options for the map tasks. 219 * 220 * Java opts for the task tracker child map processes. 221 * The following symbol, if present, will be interpolated: @taskid@. 222 * It is replaced by current TaskID. Any other occurrences of '@' will go 223 * unchanged. 224 * For example, to enable verbose gc logging to a file named for the taskid in 225 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 226 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 227 * 228 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 229 * other environment variables to the map processes. 230 */ 231 public static final String MAPRED_MAP_TASK_JAVA_OPTS = 232 JobContext.MAP_JAVA_OPTS; 233 234 /** 235 * Configuration key to set the java command line options for the reduce tasks. 236 * 237 * Java opts for the task tracker child reduce processes. 238 * The following symbol, if present, will be interpolated: @taskid@. 239 * It is replaced by current TaskID. Any other occurrences of '@' will go 240 * unchanged. 241 * For example, to enable verbose gc logging to a file named for the taskid in 242 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 243 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 244 * 245 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 246 * pass process environment variables to the reduce processes. 247 */ 248 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 249 JobContext.REDUCE_JAVA_OPTS; 250 251 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m"; 252 253 /** 254 * @deprecated 255 * Configuration key to set the maximum virtual memory available to the child 256 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no 257 * longer have any effect. 258 */ 259 @Deprecated 260 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit"; 261 262 /** 263 * @deprecated 264 * Configuration key to set the maximum virtual memory available to the 265 * map tasks (in kilo-bytes). This has been deprecated and will no 266 * longer have any effect. 267 */ 268 @Deprecated 269 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit"; 270 271 /** 272 * @deprecated 273 * Configuration key to set the maximum virtual memory available to the 274 * reduce tasks (in kilo-bytes). This has been deprecated and will no 275 * longer have any effect. 276 */ 277 @Deprecated 278 public static final String MAPRED_REDUCE_TASK_ULIMIT = 279 "mapreduce.reduce.ulimit"; 280 281 282 /** 283 * Configuration key to set the environment of the child map/reduce tasks. 284 * 285 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 286 * reference existing environment variables via <code>$key</code> on 287 * Linux or <code>%key%</code> on Windows. 288 * 289 * Example: 290 * <ul> 291 * <li> A=foo - This will set the env variable A to foo. </li> 292 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 293 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 294 * </ul> 295 * 296 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 297 * {@link #MAPRED_REDUCE_TASK_ENV} 298 */ 299 @Deprecated 300 public static final String MAPRED_TASK_ENV = "mapred.child.env"; 301 302 /** 303 * Configuration key to set the environment of the child map tasks. 304 * 305 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 306 * reference existing environment variables via <code>$key</code> on 307 * Linux or <code>%key%</code> on Windows. 308 * 309 * Example: 310 * <ul> 311 * <li> A=foo - This will set the env variable A to foo. </li> 312 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 313 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 314 * </ul> 315 */ 316 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV; 317 318 /** 319 * Configuration key to set the environment of the child reduce tasks. 320 * 321 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 322 * reference existing environment variables via <code>$key</code> on 323 * Linux or <code>%key%</code> on Windows. 324 * 325 * Example: 326 * <ul> 327 * <li> A=foo - This will set the env variable A to foo. </li> 328 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 329 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 330 * </ul> 331 */ 332 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV; 333 334 private Credentials credentials = new Credentials(); 335 336 /** 337 * Configuration key to set the logging {@link Level} for the map task. 338 * 339 * The allowed logging levels are: 340 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 341 */ 342 public static final String MAPRED_MAP_TASK_LOG_LEVEL = 343 JobContext.MAP_LOG_LEVEL; 344 345 /** 346 * Configuration key to set the logging {@link Level} for the reduce task. 347 * 348 * The allowed logging levels are: 349 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 350 */ 351 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 352 JobContext.REDUCE_LOG_LEVEL; 353 354 /** 355 * Default logging level for map/reduce tasks. 356 */ 357 public static final Level DEFAULT_LOG_LEVEL = Level.INFO; 358 359 /** 360 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 361 * use {@link MRJobConfig#WORKFLOW_ID} instead 362 */ 363 @Deprecated 364 public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID; 365 366 /** 367 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 368 * use {@link MRJobConfig#WORKFLOW_NAME} instead 369 */ 370 @Deprecated 371 public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME; 372 373 /** 374 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 375 * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead 376 */ 377 @Deprecated 378 public static final String WORKFLOW_NODE_NAME = 379 MRJobConfig.WORKFLOW_NODE_NAME; 380 381 /** 382 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 383 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead 384 */ 385 @Deprecated 386 public static final String WORKFLOW_ADJACENCY_PREFIX_STRING = 387 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING; 388 389 /** 390 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 391 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead 392 */ 393 @Deprecated 394 public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN = 395 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN; 396 397 /** 398 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 399 * use {@link MRJobConfig#WORKFLOW_TAGS} instead 400 */ 401 @Deprecated 402 public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS; 403 404 /** 405 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 406 * not use it 407 */ 408 @Deprecated 409 public static final String MAPREDUCE_RECOVER_JOB = 410 "mapreduce.job.restart.recover"; 411 412 /** 413 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 414 * not use it 415 */ 416 @Deprecated 417 public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true; 418 419 /** 420 * Construct a map/reduce job configuration. 421 */ 422 public JobConf() { 423 checkAndWarnDeprecation(); 424 } 425 426 /** 427 * Construct a map/reduce job configuration. 428 * 429 * @param exampleClass a class whose containing jar is used as the job's jar. 430 */ 431 public JobConf(Class exampleClass) { 432 setJarByClass(exampleClass); 433 checkAndWarnDeprecation(); 434 } 435 436 /** 437 * Construct a map/reduce job configuration. 438 * 439 * @param conf a Configuration whose settings will be inherited. 440 */ 441 public JobConf(Configuration conf) { 442 super(conf); 443 444 if (conf instanceof JobConf) { 445 JobConf that = (JobConf)conf; 446 credentials = that.credentials; 447 } 448 449 checkAndWarnDeprecation(); 450 } 451 452 453 /** Construct a map/reduce job configuration. 454 * 455 * @param conf a Configuration whose settings will be inherited. 456 * @param exampleClass a class whose containing jar is used as the job's jar. 457 */ 458 public JobConf(Configuration conf, Class exampleClass) { 459 this(conf); 460 setJarByClass(exampleClass); 461 } 462 463 464 /** Construct a map/reduce configuration. 465 * 466 * @param config a Configuration-format XML job description file. 467 */ 468 public JobConf(String config) { 469 this(new Path(config)); 470 } 471 472 /** Construct a map/reduce configuration. 473 * 474 * @param config a Configuration-format XML job description file. 475 */ 476 public JobConf(Path config) { 477 super(); 478 addResource(config); 479 checkAndWarnDeprecation(); 480 } 481 482 /** A new map/reduce configuration where the behavior of reading from the 483 * default resources can be turned off. 484 * <p> 485 * If the parameter {@code loadDefaults} is false, the new instance 486 * will not load resources from the default files. 487 * 488 * @param loadDefaults specifies whether to load from the default files 489 */ 490 public JobConf(boolean loadDefaults) { 491 super(loadDefaults); 492 checkAndWarnDeprecation(); 493 } 494 495 /** 496 * Get credentials for the job. 497 * @return credentials for the job 498 */ 499 public Credentials getCredentials() { 500 return credentials; 501 } 502 503 @Private 504 public void setCredentials(Credentials credentials) { 505 this.credentials = credentials; 506 } 507 508 /** 509 * Get the user jar for the map-reduce job. 510 * 511 * @return the user jar for the map-reduce job. 512 */ 513 public String getJar() { return get(JobContext.JAR); } 514 515 /** 516 * Set the user jar for the map-reduce job. 517 * 518 * @param jar the user jar for the map-reduce job. 519 */ 520 public void setJar(String jar) { set(JobContext.JAR, jar); } 521 522 /** 523 * Get the pattern for jar contents to unpack on the tasktracker 524 */ 525 public Pattern getJarUnpackPattern() { 526 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT); 527 } 528 529 530 /** 531 * Set the job's jar file by finding an example class location. 532 * 533 * @param cls the example class. 534 */ 535 public void setJarByClass(Class cls) { 536 String jar = ClassUtil.findContainingJar(cls); 537 if (jar != null) { 538 setJar(jar); 539 } 540 } 541 542 public String[] getLocalDirs() throws IOException { 543 return getTrimmedStrings(MRConfig.LOCAL_DIR); 544 } 545 546 /** 547 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead. 548 */ 549 @Deprecated 550 public void deleteLocalFiles() throws IOException { 551 String[] localDirs = getLocalDirs(); 552 for (int i = 0; i < localDirs.length; i++) { 553 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true); 554 } 555 } 556 557 public void deleteLocalFiles(String subdir) throws IOException { 558 String[] localDirs = getLocalDirs(); 559 for (int i = 0; i < localDirs.length; i++) { 560 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true); 561 } 562 } 563 564 /** 565 * Constructs a local file name. Files are distributed among configured 566 * local directories. 567 */ 568 public Path getLocalPath(String pathString) throws IOException { 569 return getLocalPath(MRConfig.LOCAL_DIR, pathString); 570 } 571 572 /** 573 * Get the reported username for this job. 574 * 575 * @return the username 576 */ 577 public String getUser() { 578 return get(JobContext.USER_NAME); 579 } 580 581 /** 582 * Set the reported username for this job. 583 * 584 * @param user the username for this job. 585 */ 586 public void setUser(String user) { 587 set(JobContext.USER_NAME, user); 588 } 589 590 591 592 /** 593 * Set whether the framework should keep the intermediate files for 594 * failed tasks. 595 * 596 * @param keep <code>true</code> if framework should keep the intermediate files 597 * for failed tasks, <code>false</code> otherwise. 598 * 599 */ 600 public void setKeepFailedTaskFiles(boolean keep) { 601 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep); 602 } 603 604 /** 605 * Should the temporary files for failed tasks be kept? 606 * 607 * @return should the files be kept? 608 */ 609 public boolean getKeepFailedTaskFiles() { 610 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false); 611 } 612 613 /** 614 * Set a regular expression for task names that should be kept. 615 * The regular expression ".*_m_000123_0" would keep the files 616 * for the first instance of map 123 that ran. 617 * 618 * @param pattern the java.util.regex.Pattern to match against the 619 * task names. 620 */ 621 public void setKeepTaskFilesPattern(String pattern) { 622 set(JobContext.PRESERVE_FILES_PATTERN, pattern); 623 } 624 625 /** 626 * Get the regular expression that is matched against the task names 627 * to see if we need to keep the files. 628 * 629 * @return the pattern as a string, if it was set, othewise null. 630 */ 631 public String getKeepTaskFilesPattern() { 632 return get(JobContext.PRESERVE_FILES_PATTERN); 633 } 634 635 /** 636 * Set the current working directory for the default file system. 637 * 638 * @param dir the new current working directory. 639 */ 640 public void setWorkingDirectory(Path dir) { 641 dir = new Path(getWorkingDirectory(), dir); 642 set(JobContext.WORKING_DIR, dir.toString()); 643 } 644 645 /** 646 * Get the current working directory for the default file system. 647 * 648 * @return the directory name. 649 */ 650 public Path getWorkingDirectory() { 651 String name = get(JobContext.WORKING_DIR); 652 if (name != null) { 653 return new Path(name); 654 } else { 655 try { 656 Path dir = FileSystem.get(this).getWorkingDirectory(); 657 set(JobContext.WORKING_DIR, dir.toString()); 658 return dir; 659 } catch (IOException e) { 660 throw new RuntimeException(e); 661 } 662 } 663 } 664 665 /** 666 * Sets the number of tasks that a spawned task JVM should run 667 * before it exits 668 * @param numTasks the number of tasks to execute; defaults to 1; 669 * -1 signifies no limit 670 */ 671 public void setNumTasksToExecutePerJvm(int numTasks) { 672 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks); 673 } 674 675 /** 676 * Get the number of tasks that a spawned JVM should execute 677 */ 678 public int getNumTasksToExecutePerJvm() { 679 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1); 680 } 681 682 /** 683 * Get the {@link InputFormat} implementation for the map-reduce job, 684 * defaults to {@link TextInputFormat} if not specified explicity. 685 * 686 * @return the {@link InputFormat} implementation for the map-reduce job. 687 */ 688 public InputFormat getInputFormat() { 689 return ReflectionUtils.newInstance(getClass("mapred.input.format.class", 690 TextInputFormat.class, 691 InputFormat.class), 692 this); 693 } 694 695 /** 696 * Set the {@link InputFormat} implementation for the map-reduce job. 697 * 698 * @param theClass the {@link InputFormat} implementation for the map-reduce 699 * job. 700 */ 701 public void setInputFormat(Class<? extends InputFormat> theClass) { 702 setClass("mapred.input.format.class", theClass, InputFormat.class); 703 } 704 705 /** 706 * Get the {@link OutputFormat} implementation for the map-reduce job, 707 * defaults to {@link TextOutputFormat} if not specified explicity. 708 * 709 * @return the {@link OutputFormat} implementation for the map-reduce job. 710 */ 711 public OutputFormat getOutputFormat() { 712 return ReflectionUtils.newInstance(getClass("mapred.output.format.class", 713 TextOutputFormat.class, 714 OutputFormat.class), 715 this); 716 } 717 718 /** 719 * Get the {@link OutputCommitter} implementation for the map-reduce job, 720 * defaults to {@link FileOutputCommitter} if not specified explicitly. 721 * 722 * @return the {@link OutputCommitter} implementation for the map-reduce job. 723 */ 724 public OutputCommitter getOutputCommitter() { 725 return (OutputCommitter)ReflectionUtils.newInstance( 726 getClass("mapred.output.committer.class", FileOutputCommitter.class, 727 OutputCommitter.class), this); 728 } 729 730 /** 731 * Set the {@link OutputCommitter} implementation for the map-reduce job. 732 * 733 * @param theClass the {@link OutputCommitter} implementation for the map-reduce 734 * job. 735 */ 736 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) { 737 setClass("mapred.output.committer.class", theClass, OutputCommitter.class); 738 } 739 740 /** 741 * Set the {@link OutputFormat} implementation for the map-reduce job. 742 * 743 * @param theClass the {@link OutputFormat} implementation for the map-reduce 744 * job. 745 */ 746 public void setOutputFormat(Class<? extends OutputFormat> theClass) { 747 setClass("mapred.output.format.class", theClass, OutputFormat.class); 748 } 749 750 /** 751 * Should the map outputs be compressed before transfer? 752 * 753 * @param compress should the map outputs be compressed? 754 */ 755 public void setCompressMapOutput(boolean compress) { 756 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress); 757 } 758 759 /** 760 * Are the outputs of the maps be compressed? 761 * 762 * @return <code>true</code> if the outputs of the maps are to be compressed, 763 * <code>false</code> otherwise. 764 */ 765 public boolean getCompressMapOutput() { 766 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false); 767 } 768 769 /** 770 * Set the given class as the {@link CompressionCodec} for the map outputs. 771 * 772 * @param codecClass the {@link CompressionCodec} class that will compress 773 * the map outputs. 774 */ 775 public void 776 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) { 777 setCompressMapOutput(true); 778 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 779 CompressionCodec.class); 780 } 781 782 /** 783 * Get the {@link CompressionCodec} for compressing the map outputs. 784 * 785 * @param defaultValue the {@link CompressionCodec} to return if not set 786 * @return the {@link CompressionCodec} class that should be used to compress the 787 * map outputs. 788 * @throws IllegalArgumentException if the class was specified, but not found 789 */ 790 public Class<? extends CompressionCodec> 791 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) { 792 Class<? extends CompressionCodec> codecClass = defaultValue; 793 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC); 794 if (name != null) { 795 try { 796 codecClass = getClassByName(name).asSubclass(CompressionCodec.class); 797 } catch (ClassNotFoundException e) { 798 throw new IllegalArgumentException("Compression codec " + name + 799 " was not found.", e); 800 } 801 } 802 return codecClass; 803 } 804 805 /** 806 * Get the key class for the map output data. If it is not set, use the 807 * (final) output key class. This allows the map output key class to be 808 * different than the final output key class. 809 * 810 * @return the map output key class. 811 */ 812 public Class<?> getMapOutputKeyClass() { 813 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class); 814 if (retv == null) { 815 retv = getOutputKeyClass(); 816 } 817 return retv; 818 } 819 820 /** 821 * Set the key class for the map output data. This allows the user to 822 * specify the map output key class to be different than the final output 823 * value class. 824 * 825 * @param theClass the map output key class. 826 */ 827 public void setMapOutputKeyClass(Class<?> theClass) { 828 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class); 829 } 830 831 /** 832 * Get the value class for the map output data. If it is not set, use the 833 * (final) output value class This allows the map output value class to be 834 * different than the final output value class. 835 * 836 * @return the map output value class. 837 */ 838 public Class<?> getMapOutputValueClass() { 839 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null, 840 Object.class); 841 if (retv == null) { 842 retv = getOutputValueClass(); 843 } 844 return retv; 845 } 846 847 /** 848 * Set the value class for the map output data. This allows the user to 849 * specify the map output value class to be different than the final output 850 * value class. 851 * 852 * @param theClass the map output value class. 853 */ 854 public void setMapOutputValueClass(Class<?> theClass) { 855 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class); 856 } 857 858 /** 859 * Get the key class for the job output data. 860 * 861 * @return the key class for the job output data. 862 */ 863 public Class<?> getOutputKeyClass() { 864 return getClass(JobContext.OUTPUT_KEY_CLASS, 865 LongWritable.class, Object.class); 866 } 867 868 /** 869 * Set the key class for the job output data. 870 * 871 * @param theClass the key class for the job output data. 872 */ 873 public void setOutputKeyClass(Class<?> theClass) { 874 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class); 875 } 876 877 /** 878 * Get the {@link RawComparator} comparator used to compare keys. 879 * 880 * @return the {@link RawComparator} comparator used to compare keys. 881 */ 882 public RawComparator getOutputKeyComparator() { 883 Class<? extends RawComparator> theClass = getClass( 884 JobContext.KEY_COMPARATOR, null, RawComparator.class); 885 if (theClass != null) 886 return ReflectionUtils.newInstance(theClass, this); 887 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this); 888 } 889 890 /** 891 * Set the {@link RawComparator} comparator used to compare keys. 892 * 893 * @param theClass the {@link RawComparator} comparator used to 894 * compare keys. 895 * @see #setOutputValueGroupingComparator(Class) 896 */ 897 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) { 898 setClass(JobContext.KEY_COMPARATOR, 899 theClass, RawComparator.class); 900 } 901 902 /** 903 * Set the {@link KeyFieldBasedComparator} options used to compare keys. 904 * 905 * @param keySpec the key specification of the form -k pos1[,pos2], where, 906 * pos is of the form f[.c][opts], where f is the number 907 * of the key field to use, and c is the number of the first character from 908 * the beginning of the field. Fields and character posns are numbered 909 * starting with 1; a character position of zero in pos2 indicates the 910 * field's last character. If '.c' is omitted from pos1, it defaults to 1 911 * (the beginning of the field); if omitted from pos2, it defaults to 0 912 * (the end of the field). opts are ordering options. The supported options 913 * are: 914 * -n, (Sort numerically) 915 * -r, (Reverse the result of comparison) 916 */ 917 public void setKeyFieldComparatorOptions(String keySpec) { 918 setOutputKeyComparatorClass(KeyFieldBasedComparator.class); 919 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec); 920 } 921 922 /** 923 * Get the {@link KeyFieldBasedComparator} options 924 */ 925 public String getKeyFieldComparatorOption() { 926 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS); 927 } 928 929 /** 930 * Set the {@link KeyFieldBasedPartitioner} options used for 931 * {@link Partitioner} 932 * 933 * @param keySpec the key specification of the form -k pos1[,pos2], where, 934 * pos is of the form f[.c][opts], where f is the number 935 * of the key field to use, and c is the number of the first character from 936 * the beginning of the field. Fields and character posns are numbered 937 * starting with 1; a character position of zero in pos2 indicates the 938 * field's last character. If '.c' is omitted from pos1, it defaults to 1 939 * (the beginning of the field); if omitted from pos2, it defaults to 0 940 * (the end of the field). 941 */ 942 public void setKeyFieldPartitionerOptions(String keySpec) { 943 setPartitionerClass(KeyFieldBasedPartitioner.class); 944 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec); 945 } 946 947 /** 948 * Get the {@link KeyFieldBasedPartitioner} options 949 */ 950 public String getKeyFieldPartitionerOption() { 951 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS); 952 } 953 954 /** 955 * Get the user defined {@link WritableComparable} comparator for 956 * grouping keys of inputs to the combiner. 957 * 958 * @return comparator set by the user for grouping values. 959 * @see #setCombinerKeyGroupingComparator(Class) for details. 960 */ 961 public RawComparator getCombinerKeyGroupingComparator() { 962 Class<? extends RawComparator> theClass = getClass( 963 JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class); 964 if (theClass == null) { 965 return getOutputKeyComparator(); 966 } 967 968 return ReflectionUtils.newInstance(theClass, this); 969 } 970 971 /** 972 * Get the user defined {@link WritableComparable} comparator for 973 * grouping keys of inputs to the reduce. 974 * 975 * @return comparator set by the user for grouping values. 976 * @see #setOutputValueGroupingComparator(Class) for details. 977 */ 978 public RawComparator getOutputValueGroupingComparator() { 979 Class<? extends RawComparator> theClass = getClass( 980 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class); 981 if (theClass == null) { 982 return getOutputKeyComparator(); 983 } 984 985 return ReflectionUtils.newInstance(theClass, this); 986 } 987 988 /** 989 * Set the user defined {@link RawComparator} comparator for 990 * grouping keys in the input to the combiner. 991 * 992 * <p>This comparator should be provided if the equivalence rules for keys 993 * for sorting the intermediates are different from those for grouping keys 994 * before each call to 995 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 996 * 997 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 998 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 999 * 1000 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 1001 * how keys are sorted, this can be used in conjunction to simulate 1002 * <i>secondary sort on values</i>.</p> 1003 * 1004 * <p><i>Note</i>: This is not a guarantee of the combiner sort being 1005 * <i>stable</i> in any sense. (In any case, with the order of available 1006 * map-outputs to the combiner being non-deterministic, it wouldn't make 1007 * that much sense.)</p> 1008 * 1009 * @param theClass the comparator class to be used for grouping keys for the 1010 * combiner. It should implement <code>RawComparator</code>. 1011 * @see #setOutputKeyComparatorClass(Class) 1012 */ 1013 public void setCombinerKeyGroupingComparator( 1014 Class<? extends RawComparator> theClass) { 1015 setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS, 1016 theClass, RawComparator.class); 1017 } 1018 1019 /** 1020 * Set the user defined {@link RawComparator} comparator for 1021 * grouping keys in the input to the reduce. 1022 * 1023 * <p>This comparator should be provided if the equivalence rules for keys 1024 * for sorting the intermediates are different from those for grouping keys 1025 * before each call to 1026 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 1027 * 1028 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 1029 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 1030 * 1031 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 1032 * how keys are sorted, this can be used in conjunction to simulate 1033 * <i>secondary sort on values</i>.</p> 1034 * 1035 * <p><i>Note</i>: This is not a guarantee of the reduce sort being 1036 * <i>stable</i> in any sense. (In any case, with the order of available 1037 * map-outputs to the reduce being non-deterministic, it wouldn't make 1038 * that much sense.)</p> 1039 * 1040 * @param theClass the comparator class to be used for grouping keys. 1041 * It should implement <code>RawComparator</code>. 1042 * @see #setOutputKeyComparatorClass(Class) 1043 * @see #setCombinerKeyGroupingComparator(Class) 1044 */ 1045 public void setOutputValueGroupingComparator( 1046 Class<? extends RawComparator> theClass) { 1047 setClass(JobContext.GROUP_COMPARATOR_CLASS, 1048 theClass, RawComparator.class); 1049 } 1050 1051 /** 1052 * Should the framework use the new context-object code for running 1053 * the mapper? 1054 * @return true, if the new api should be used 1055 */ 1056 public boolean getUseNewMapper() { 1057 return getBoolean("mapred.mapper.new-api", false); 1058 } 1059 /** 1060 * Set whether the framework should use the new api for the mapper. 1061 * This is the default for jobs submitted with the new Job api. 1062 * @param flag true, if the new api should be used 1063 */ 1064 public void setUseNewMapper(boolean flag) { 1065 setBoolean("mapred.mapper.new-api", flag); 1066 } 1067 1068 /** 1069 * Should the framework use the new context-object code for running 1070 * the reducer? 1071 * @return true, if the new api should be used 1072 */ 1073 public boolean getUseNewReducer() { 1074 return getBoolean("mapred.reducer.new-api", false); 1075 } 1076 /** 1077 * Set whether the framework should use the new api for the reducer. 1078 * This is the default for jobs submitted with the new Job api. 1079 * @param flag true, if the new api should be used 1080 */ 1081 public void setUseNewReducer(boolean flag) { 1082 setBoolean("mapred.reducer.new-api", flag); 1083 } 1084 1085 /** 1086 * Get the value class for job outputs. 1087 * 1088 * @return the value class for job outputs. 1089 */ 1090 public Class<?> getOutputValueClass() { 1091 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class); 1092 } 1093 1094 /** 1095 * Set the value class for job outputs. 1096 * 1097 * @param theClass the value class for job outputs. 1098 */ 1099 public void setOutputValueClass(Class<?> theClass) { 1100 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class); 1101 } 1102 1103 /** 1104 * Get the {@link Mapper} class for the job. 1105 * 1106 * @return the {@link Mapper} class for the job. 1107 */ 1108 public Class<? extends Mapper> getMapperClass() { 1109 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class); 1110 } 1111 1112 /** 1113 * Set the {@link Mapper} class for the job. 1114 * 1115 * @param theClass the {@link Mapper} class for the job. 1116 */ 1117 public void setMapperClass(Class<? extends Mapper> theClass) { 1118 setClass("mapred.mapper.class", theClass, Mapper.class); 1119 } 1120 1121 /** 1122 * Get the {@link MapRunnable} class for the job. 1123 * 1124 * @return the {@link MapRunnable} class for the job. 1125 */ 1126 public Class<? extends MapRunnable> getMapRunnerClass() { 1127 return getClass("mapred.map.runner.class", 1128 MapRunner.class, MapRunnable.class); 1129 } 1130 1131 /** 1132 * Expert: Set the {@link MapRunnable} class for the job. 1133 * 1134 * Typically used to exert greater control on {@link Mapper}s. 1135 * 1136 * @param theClass the {@link MapRunnable} class for the job. 1137 */ 1138 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) { 1139 setClass("mapred.map.runner.class", theClass, MapRunnable.class); 1140 } 1141 1142 /** 1143 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 1144 * to be sent to the {@link Reducer}s. 1145 * 1146 * @return the {@link Partitioner} used to partition map-outputs. 1147 */ 1148 public Class<? extends Partitioner> getPartitionerClass() { 1149 return getClass("mapred.partitioner.class", 1150 HashPartitioner.class, Partitioner.class); 1151 } 1152 1153 /** 1154 * Set the {@link Partitioner} class used to partition 1155 * {@link Mapper}-outputs to be sent to the {@link Reducer}s. 1156 * 1157 * @param theClass the {@link Partitioner} used to partition map-outputs. 1158 */ 1159 public void setPartitionerClass(Class<? extends Partitioner> theClass) { 1160 setClass("mapred.partitioner.class", theClass, Partitioner.class); 1161 } 1162 1163 /** 1164 * Get the {@link Reducer} class for the job. 1165 * 1166 * @return the {@link Reducer} class for the job. 1167 */ 1168 public Class<? extends Reducer> getReducerClass() { 1169 return getClass("mapred.reducer.class", 1170 IdentityReducer.class, Reducer.class); 1171 } 1172 1173 /** 1174 * Set the {@link Reducer} class for the job. 1175 * 1176 * @param theClass the {@link Reducer} class for the job. 1177 */ 1178 public void setReducerClass(Class<? extends Reducer> theClass) { 1179 setClass("mapred.reducer.class", theClass, Reducer.class); 1180 } 1181 1182 /** 1183 * Get the user-defined <i>combiner</i> class used to combine map-outputs 1184 * before being sent to the reducers. Typically the combiner is same as the 1185 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}. 1186 * 1187 * @return the user-defined combiner class used to combine map-outputs. 1188 */ 1189 public Class<? extends Reducer> getCombinerClass() { 1190 return getClass("mapred.combiner.class", null, Reducer.class); 1191 } 1192 1193 /** 1194 * Set the user-defined <i>combiner</i> class used to combine map-outputs 1195 * before being sent to the reducers. 1196 * 1197 * <p>The combiner is an application-specified aggregation operation, which 1198 * can help cut down the amount of data transferred between the 1199 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p> 1200 * 1201 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both 1202 * the mapper and reducer tasks. In general, the combiner is called as the 1203 * sort/merge result is written to disk. The combiner must: 1204 * <ul> 1205 * <li> be side-effect free</li> 1206 * <li> have the same input and output key types and the same input and 1207 * output value types</li> 1208 * </ul> 1209 * 1210 * <p>Typically the combiner is same as the <code>Reducer</code> for the 1211 * job i.e. {@link #setReducerClass(Class)}.</p> 1212 * 1213 * @param theClass the user-defined combiner class used to combine 1214 * map-outputs. 1215 */ 1216 public void setCombinerClass(Class<? extends Reducer> theClass) { 1217 setClass("mapred.combiner.class", theClass, Reducer.class); 1218 } 1219 1220 /** 1221 * Should speculative execution be used for this job? 1222 * Defaults to <code>true</code>. 1223 * 1224 * @return <code>true</code> if speculative execution be used for this job, 1225 * <code>false</code> otherwise. 1226 */ 1227 public boolean getSpeculativeExecution() { 1228 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution()); 1229 } 1230 1231 /** 1232 * Turn speculative execution on or off for this job. 1233 * 1234 * @param speculativeExecution <code>true</code> if speculative execution 1235 * should be turned on, else <code>false</code>. 1236 */ 1237 public void setSpeculativeExecution(boolean speculativeExecution) { 1238 setMapSpeculativeExecution(speculativeExecution); 1239 setReduceSpeculativeExecution(speculativeExecution); 1240 } 1241 1242 /** 1243 * Should speculative execution be used for this job for map tasks? 1244 * Defaults to <code>true</code>. 1245 * 1246 * @return <code>true</code> if speculative execution be 1247 * used for this job for map tasks, 1248 * <code>false</code> otherwise. 1249 */ 1250 public boolean getMapSpeculativeExecution() { 1251 return getBoolean(JobContext.MAP_SPECULATIVE, true); 1252 } 1253 1254 /** 1255 * Turn speculative execution on or off for this job for map tasks. 1256 * 1257 * @param speculativeExecution <code>true</code> if speculative execution 1258 * should be turned on for map tasks, 1259 * else <code>false</code>. 1260 */ 1261 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1262 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution); 1263 } 1264 1265 /** 1266 * Should speculative execution be used for this job for reduce tasks? 1267 * Defaults to <code>true</code>. 1268 * 1269 * @return <code>true</code> if speculative execution be used 1270 * for reduce tasks for this job, 1271 * <code>false</code> otherwise. 1272 */ 1273 public boolean getReduceSpeculativeExecution() { 1274 return getBoolean(JobContext.REDUCE_SPECULATIVE, true); 1275 } 1276 1277 /** 1278 * Turn speculative execution on or off for this job for reduce tasks. 1279 * 1280 * @param speculativeExecution <code>true</code> if speculative execution 1281 * should be turned on for reduce tasks, 1282 * else <code>false</code>. 1283 */ 1284 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1285 setBoolean(JobContext.REDUCE_SPECULATIVE, 1286 speculativeExecution); 1287 } 1288 1289 /** 1290 * Get configured the number of reduce tasks for this job. 1291 * Defaults to <code>1</code>. 1292 * 1293 * @return the number of reduce tasks for this job. 1294 */ 1295 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); } 1296 1297 /** 1298 * Set the number of map tasks for this job. 1299 * 1300 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 1301 * number of spawned map tasks depends on the number of {@link InputSplit}s 1302 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}. 1303 * 1304 * A custom {@link InputFormat} is typically used to accurately control 1305 * the number of map tasks for the job.</p> 1306 * 1307 * <b id="NoOfMaps">How many maps?</b> 1308 * 1309 * <p>The number of maps is usually driven by the total size of the inputs 1310 * i.e. total number of blocks of the input files.</p> 1311 * 1312 * <p>The right level of parallelism for maps seems to be around 10-100 maps 1313 * per-node, although it has been set up to 300 or so for very cpu-light map 1314 * tasks. Task setup takes awhile, so it is best if the maps take at least a 1315 * minute to execute.</p> 1316 * 1317 * <p>The default behavior of file-based {@link InputFormat}s is to split the 1318 * input into <i>logical</i> {@link InputSplit}s based on the total size, in 1319 * bytes, of input files. However, the {@link FileSystem} blocksize of the 1320 * input files is treated as an upper bound for input splits. A lower bound 1321 * on the split size can be set via 1322 * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize"> 1323 * mapreduce.input.fileinputformat.split.minsize</a>.</p> 1324 * 1325 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 1326 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 1327 * used to set it even higher.</p> 1328 * 1329 * @param n the number of map tasks for this job. 1330 * @see InputFormat#getSplits(JobConf, int) 1331 * @see FileInputFormat 1332 * @see FileSystem#getDefaultBlockSize() 1333 * @see FileStatus#getBlockSize() 1334 */ 1335 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); } 1336 1337 /** 1338 * Get configured the number of reduce tasks for this job. Defaults to 1339 * <code>1</code>. 1340 * 1341 * @return the number of reduce tasks for this job. 1342 */ 1343 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); } 1344 1345 /** 1346 * Set the requisite number of reduce tasks for this job. 1347 * 1348 * <b id="NoOfReduces">How many reduces?</b> 1349 * 1350 * <p>The right number of reduces seems to be <code>0.95</code> or 1351 * <code>1.75</code> multiplied by ( 1352 * <i>available memory for reduce tasks</i> 1353 * (The value of this should be smaller than 1354 * numNodes * yarn.nodemanager.resource.memory-mb 1355 * since the resource of memory is shared by map tasks and other 1356 * applications) / 1357 * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.reduce.memory.mb"> 1358 * mapreduce.reduce.memory.mb</a>). 1359 * </p> 1360 * 1361 * <p>With <code>0.95</code> all of the reduces can launch immediately and 1362 * start transfering map outputs as the maps finish. With <code>1.75</code> 1363 * the faster nodes will finish their first round of reduces and launch a 1364 * second wave of reduces doing a much better job of load balancing.</p> 1365 * 1366 * <p>Increasing the number of reduces increases the framework overhead, but 1367 * increases load balancing and lowers the cost of failures.</p> 1368 * 1369 * <p>The scaling factors above are slightly less than whole numbers to 1370 * reserve a few reduce slots in the framework for speculative-tasks, failures 1371 * etc.</p> 1372 * 1373 * <b id="ReducerNone">Reducer NONE</b> 1374 * 1375 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p> 1376 * 1377 * <p>In this case the output of the map-tasks directly go to distributed 1378 * file-system, to the path set by 1379 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 1380 * framework doesn't sort the map-outputs before writing it out to HDFS.</p> 1381 * 1382 * @param n the number of reduce tasks for this job. 1383 */ 1384 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); } 1385 1386 /** 1387 * Get the configured number of maximum attempts that will be made to run a 1388 * map task, as specified by the <code>mapreduce.map.maxattempts</code> 1389 * property. If this property is not already set, the default is 4 attempts. 1390 * 1391 * @return the max number of attempts per map task. 1392 */ 1393 public int getMaxMapAttempts() { 1394 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4); 1395 } 1396 1397 /** 1398 * Expert: Set the number of maximum attempts that will be made to run a 1399 * map task. 1400 * 1401 * @param n the number of attempts per map task. 1402 */ 1403 public void setMaxMapAttempts(int n) { 1404 setInt(JobContext.MAP_MAX_ATTEMPTS, n); 1405 } 1406 1407 /** 1408 * Get the configured number of maximum attempts that will be made to run a 1409 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code> 1410 * property. If this property is not already set, the default is 4 attempts. 1411 * 1412 * @return the max number of attempts per reduce task. 1413 */ 1414 public int getMaxReduceAttempts() { 1415 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4); 1416 } 1417 /** 1418 * Expert: Set the number of maximum attempts that will be made to run a 1419 * reduce task. 1420 * 1421 * @param n the number of attempts per reduce task. 1422 */ 1423 public void setMaxReduceAttempts(int n) { 1424 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n); 1425 } 1426 1427 /** 1428 * Get the user-specified job name. This is only used to identify the 1429 * job to the user. 1430 * 1431 * @return the job's name, defaulting to "". 1432 */ 1433 public String getJobName() { 1434 return get(JobContext.JOB_NAME, ""); 1435 } 1436 1437 /** 1438 * Set the user-specified job name. 1439 * 1440 * @param name the job's new name. 1441 */ 1442 public void setJobName(String name) { 1443 set(JobContext.JOB_NAME, name); 1444 } 1445 1446 /** 1447 * Get the user-specified session identifier. The default is the empty string. 1448 * 1449 * The session identifier is used to tag metric data that is reported to some 1450 * performance metrics system via the org.apache.hadoop.metrics API. The 1451 * session identifier is intended, in particular, for use by Hadoop-On-Demand 1452 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 1453 * HOD will set the session identifier by modifying the mapred-site.xml file 1454 * before starting the cluster. 1455 * 1456 * When not running under HOD, this identifer is expected to remain set to 1457 * the empty string. 1458 * 1459 * @return the session identifier, defaulting to "". 1460 */ 1461 @Deprecated 1462 public String getSessionId() { 1463 return get("session.id", ""); 1464 } 1465 1466 /** 1467 * Set the user-specified session identifier. 1468 * 1469 * @param sessionId the new session id. 1470 */ 1471 @Deprecated 1472 public void setSessionId(String sessionId) { 1473 set("session.id", sessionId); 1474 } 1475 1476 /** 1477 * Set the maximum no. of failures of a given job per tasktracker. 1478 * If the no. of task failures exceeds <code>noFailures</code>, the 1479 * tasktracker is <i>blacklisted</i> for this job. 1480 * 1481 * @param noFailures maximum no. of failures of a given job per tasktracker. 1482 */ 1483 public void setMaxTaskFailuresPerTracker(int noFailures) { 1484 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures); 1485 } 1486 1487 /** 1488 * Expert: Get the maximum no. of failures of a given job per tasktracker. 1489 * If the no. of task failures exceeds this, the tasktracker is 1490 * <i>blacklisted</i> for this job. 1491 * 1492 * @return the maximum no. of failures of a given job per tasktracker. 1493 */ 1494 public int getMaxTaskFailuresPerTracker() { 1495 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3); 1496 } 1497 1498 /** 1499 * Get the maximum percentage of map tasks that can fail without 1500 * the job being aborted. 1501 * 1502 * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 1503 * attempts before being declared as <i>failed</i>. 1504 * 1505 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in 1506 * the job being declared as {@link JobStatus#FAILED}. 1507 * 1508 * @return the maximum percentage of map tasks that can fail without 1509 * the job being aborted. 1510 */ 1511 public int getMaxMapTaskFailuresPercent() { 1512 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0); 1513 } 1514 1515 /** 1516 * Expert: Set the maximum percentage of map tasks that can fail without the 1517 * job being aborted. 1518 * 1519 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 1520 * before being declared as <i>failed</i>. 1521 * 1522 * @param percent the maximum percentage of map tasks that can fail without 1523 * the job being aborted. 1524 */ 1525 public void setMaxMapTaskFailuresPercent(int percent) { 1526 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent); 1527 } 1528 1529 /** 1530 * Get the maximum percentage of reduce tasks that can fail without 1531 * the job being aborted. 1532 * 1533 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1534 * attempts before being declared as <i>failed</i>. 1535 * 1536 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 1537 * in the job being declared as {@link JobStatus#FAILED}. 1538 * 1539 * @return the maximum percentage of reduce tasks that can fail without 1540 * the job being aborted. 1541 */ 1542 public int getMaxReduceTaskFailuresPercent() { 1543 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0); 1544 } 1545 1546 /** 1547 * Set the maximum percentage of reduce tasks that can fail without the job 1548 * being aborted. 1549 * 1550 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1551 * attempts before being declared as <i>failed</i>. 1552 * 1553 * @param percent the maximum percentage of reduce tasks that can fail without 1554 * the job being aborted. 1555 */ 1556 public void setMaxReduceTaskFailuresPercent(int percent) { 1557 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent); 1558 } 1559 1560 /** 1561 * Set {@link JobPriority} for this job. 1562 * 1563 * @param prio the {@link JobPriority} for this job. 1564 */ 1565 public void setJobPriority(JobPriority prio) { 1566 set(JobContext.PRIORITY, prio.toString()); 1567 } 1568 1569 /** 1570 * Set {@link JobPriority} for this job. 1571 * 1572 * @param prio the {@link JobPriority} for this job. 1573 */ 1574 public void setJobPriorityAsInteger(int prio) { 1575 set(JobContext.PRIORITY, Integer.toString(prio)); 1576 } 1577 1578 /** 1579 * Get the {@link JobPriority} for this job. 1580 * 1581 * @return the {@link JobPriority} for this job. 1582 */ 1583 public JobPriority getJobPriority() { 1584 String prio = get(JobContext.PRIORITY); 1585 if (prio == null) { 1586 return JobPriority.DEFAULT; 1587 } 1588 1589 JobPriority priority = JobPriority.DEFAULT; 1590 try { 1591 priority = JobPriority.valueOf(prio); 1592 } catch (IllegalArgumentException e) { 1593 return convertToJobPriority(Integer.parseInt(prio)); 1594 } 1595 return priority; 1596 } 1597 1598 /** 1599 * Get the priority for this job. 1600 * 1601 * @return the priority for this job. 1602 */ 1603 public int getJobPriorityAsInteger() { 1604 String priority = get(JobContext.PRIORITY); 1605 if (priority == null) { 1606 return 0; 1607 } 1608 1609 int jobPriority = 0; 1610 try { 1611 jobPriority = convertPriorityToInteger(priority); 1612 } catch (IllegalArgumentException e) { 1613 return Integer.parseInt(priority); 1614 } 1615 return jobPriority; 1616 } 1617 1618 private int convertPriorityToInteger(String priority) { 1619 JobPriority jobPriority = JobPriority.valueOf(priority); 1620 switch (jobPriority) { 1621 case VERY_HIGH : 1622 return 5; 1623 case HIGH : 1624 return 4; 1625 case NORMAL : 1626 return 3; 1627 case LOW : 1628 return 2; 1629 case VERY_LOW : 1630 return 1; 1631 case DEFAULT : 1632 return 0; 1633 default: 1634 break; 1635 } 1636 1637 // If a user sets the priority as "UNDEFINED_PRIORITY", we can return 1638 // 0 which is also default value. 1639 return 0; 1640 } 1641 1642 private JobPriority convertToJobPriority(int priority) { 1643 switch (priority) { 1644 case 5 : 1645 return JobPriority.VERY_HIGH; 1646 case 4 : 1647 return JobPriority.HIGH; 1648 case 3 : 1649 return JobPriority.NORMAL; 1650 case 2 : 1651 return JobPriority.LOW; 1652 case 1 : 1653 return JobPriority.VERY_LOW; 1654 case 0 : 1655 return JobPriority.DEFAULT; 1656 default: 1657 break; 1658 } 1659 1660 return JobPriority.UNDEFINED_PRIORITY; 1661 } 1662 1663 /** 1664 * Set JobSubmitHostName for this job. 1665 * 1666 * @param hostname the JobSubmitHostName for this job. 1667 */ 1668 void setJobSubmitHostName(String hostname) { 1669 set(MRJobConfig.JOB_SUBMITHOST, hostname); 1670 } 1671 1672 /** 1673 * Get the JobSubmitHostName for this job. 1674 * 1675 * @return the JobSubmitHostName for this job. 1676 */ 1677 String getJobSubmitHostName() { 1678 String hostname = get(MRJobConfig.JOB_SUBMITHOST); 1679 1680 return hostname; 1681 } 1682 1683 /** 1684 * Set JobSubmitHostAddress for this job. 1685 * 1686 * @param hostadd the JobSubmitHostAddress for this job. 1687 */ 1688 void setJobSubmitHostAddress(String hostadd) { 1689 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd); 1690 } 1691 1692 /** 1693 * Get JobSubmitHostAddress for this job. 1694 * 1695 * @return JobSubmitHostAddress for this job. 1696 */ 1697 String getJobSubmitHostAddress() { 1698 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR); 1699 1700 return hostadd; 1701 } 1702 1703 /** 1704 * Get whether the task profiling is enabled. 1705 * @return true if some tasks will be profiled 1706 */ 1707 public boolean getProfileEnabled() { 1708 return getBoolean(JobContext.TASK_PROFILE, false); 1709 } 1710 1711 /** 1712 * Set whether the system should collect profiler information for some of 1713 * the tasks in this job? The information is stored in the user log 1714 * directory. 1715 * @param newValue true means it should be gathered 1716 */ 1717 public void setProfileEnabled(boolean newValue) { 1718 setBoolean(JobContext.TASK_PROFILE, newValue); 1719 } 1720 1721 /** 1722 * Get the profiler configuration arguments. 1723 * 1724 * The default value for this property is 1725 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s" 1726 * 1727 * @return the parameters to pass to the task child to configure profiling 1728 */ 1729 public String getProfileParams() { 1730 return get(JobContext.TASK_PROFILE_PARAMS, 1731 MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS); 1732 } 1733 1734 /** 1735 * Set the profiler configuration arguments. If the string contains a '%s' it 1736 * will be replaced with the name of the profiling output file when the task 1737 * runs. 1738 * 1739 * This value is passed to the task child JVM on the command line. 1740 * 1741 * @param value the configuration string 1742 */ 1743 public void setProfileParams(String value) { 1744 set(JobContext.TASK_PROFILE_PARAMS, value); 1745 } 1746 1747 /** 1748 * Get the range of maps or reduces to profile. 1749 * @param isMap is the task a map? 1750 * @return the task ranges 1751 */ 1752 public IntegerRanges getProfileTaskRange(boolean isMap) { 1753 return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 1754 JobContext.NUM_REDUCE_PROFILES), "0-2"); 1755 } 1756 1757 /** 1758 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1759 * must also be called. 1760 * @param newValue a set of integer ranges of the map ids 1761 */ 1762 public void setProfileTaskRange(boolean isMap, String newValue) { 1763 // parse the value to make sure it is legal 1764 new Configuration.IntegerRanges(newValue); 1765 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 1766 newValue); 1767 } 1768 1769 /** 1770 * Set the debug script to run when the map tasks fail. 1771 * 1772 * <p>The debug script can aid debugging of failed map tasks. The script is 1773 * given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1774 * 1775 * <p>The debug command, run on the node where the map failed, is:</p> 1776 * <p><blockquote><pre> 1777 * $script $stdout $stderr $syslog $jobconf. 1778 * </pre></blockquote> 1779 * 1780 * <p> The script file is distributed through {@link DistributedCache} 1781 * APIs. The script needs to be symlinked. </p> 1782 * 1783 * <p>Here is an example on how to submit a script 1784 * <p><blockquote><pre> 1785 * job.setMapDebugScript("./myscript"); 1786 * DistributedCache.createSymlink(job); 1787 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1788 * </pre></blockquote> 1789 * 1790 * @param mDbgScript the script name 1791 */ 1792 public void setMapDebugScript(String mDbgScript) { 1793 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript); 1794 } 1795 1796 /** 1797 * Get the map task's debug script. 1798 * 1799 * @return the debug Script for the mapred job for failed map tasks. 1800 * @see #setMapDebugScript(String) 1801 */ 1802 public String getMapDebugScript() { 1803 return get(JobContext.MAP_DEBUG_SCRIPT); 1804 } 1805 1806 /** 1807 * Set the debug script to run when the reduce tasks fail. 1808 * 1809 * <p>The debug script can aid debugging of failed reduce tasks. The script 1810 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1811 * 1812 * <p>The debug command, run on the node where the map failed, is:</p> 1813 * <p><blockquote><pre> 1814 * $script $stdout $stderr $syslog $jobconf. 1815 * </pre></blockquote> 1816 * 1817 * <p> The script file is distributed through {@link DistributedCache} 1818 * APIs. The script file needs to be symlinked </p> 1819 * 1820 * <p>Here is an example on how to submit a script 1821 * <p><blockquote><pre> 1822 * job.setReduceDebugScript("./myscript"); 1823 * DistributedCache.createSymlink(job); 1824 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1825 * </pre></blockquote> 1826 * 1827 * @param rDbgScript the script name 1828 */ 1829 public void setReduceDebugScript(String rDbgScript) { 1830 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript); 1831 } 1832 1833 /** 1834 * Get the reduce task's debug Script 1835 * 1836 * @return the debug script for the mapred job for failed reduce tasks. 1837 * @see #setReduceDebugScript(String) 1838 */ 1839 public String getReduceDebugScript() { 1840 return get(JobContext.REDUCE_DEBUG_SCRIPT); 1841 } 1842 1843 /** 1844 * Get the uri to be invoked in-order to send a notification after the job 1845 * has completed (success/failure). 1846 * 1847 * @return the job end notification uri, <code>null</code> if it hasn't 1848 * been set. 1849 * @see #setJobEndNotificationURI(String) 1850 */ 1851 public String getJobEndNotificationURI() { 1852 return get(JobContext.MR_JOB_END_NOTIFICATION_URL); 1853 } 1854 1855 /** 1856 * Set the uri to be invoked in-order to send a notification after the job 1857 * has completed (success/failure). 1858 * 1859 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 1860 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 1861 * identifier and completion-status respectively.</p> 1862 * 1863 * <p>This is typically used by application-writers to implement chaining of 1864 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p> 1865 * 1866 * @param uri the job end notification uri 1867 * @see JobStatus 1868 */ 1869 public void setJobEndNotificationURI(String uri) { 1870 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri); 1871 } 1872 1873 /** 1874 * Get job-specific shared directory for use as scratch space 1875 * 1876 * <p> 1877 * When a job starts, a shared directory is created at location 1878 * <code> 1879 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>. 1880 * This directory is exposed to the users through 1881 * <code>mapreduce.job.local.dir </code>. 1882 * So, the tasks can use this space 1883 * as scratch space and share files among them. </p> 1884 * This value is available as System property also. 1885 * 1886 * @return The localized job specific shared directory 1887 */ 1888 public String getJobLocalDir() { 1889 return get(JobContext.JOB_LOCAL_DIR); 1890 } 1891 1892 /** 1893 * Get memory required to run a map task of the job, in MB. 1894 * 1895 * If a value is specified in the configuration, it is returned. 1896 * Else, it returns {@link JobContext#DEFAULT_MAP_MEMORY_MB}. 1897 * <p> 1898 * For backward compatibility, if the job configuration sets the 1899 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1900 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1901 * after converting it from bytes to MB. 1902 * @return memory required to run a map task of the job, in MB, 1903 */ 1904 public long getMemoryForMapTask() { 1905 long value = getDeprecatedMemoryValue(); 1906 if (value < 0) { 1907 return getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, 1908 JobContext.DEFAULT_MAP_MEMORY_MB); 1909 } 1910 return value; 1911 } 1912 1913 public void setMemoryForMapTask(long mem) { 1914 setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1915 // In case that M/R 1.x applications use the old property name 1916 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1917 } 1918 1919 /** 1920 * Get memory required to run a reduce task of the job, in MB. 1921 * 1922 * If a value is specified in the configuration, it is returned. 1923 * Else, it returns {@link JobContext#DEFAULT_REDUCE_MEMORY_MB}. 1924 * <p> 1925 * For backward compatibility, if the job configuration sets the 1926 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1927 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1928 * after converting it from bytes to MB. 1929 * @return memory required to run a reduce task of the job, in MB. 1930 */ 1931 public long getMemoryForReduceTask() { 1932 long value = getDeprecatedMemoryValue(); 1933 if (value < 0) { 1934 return getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, 1935 JobContext.DEFAULT_REDUCE_MEMORY_MB); 1936 } 1937 return value; 1938 } 1939 1940 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY, 1941 // converted into MBs. 1942 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative 1943 // value. 1944 private long getDeprecatedMemoryValue() { 1945 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 1946 DISABLED_MEMORY_LIMIT); 1947 if (oldValue > 0) { 1948 oldValue /= (1024*1024); 1949 } 1950 return oldValue; 1951 } 1952 1953 public void setMemoryForReduceTask(long mem) { 1954 setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1955 // In case that M/R 1.x applications use the old property name 1956 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1957 } 1958 1959 /** 1960 * Return the name of the queue to which this job is submitted. 1961 * Defaults to 'default'. 1962 * 1963 * @return name of the queue 1964 */ 1965 public String getQueueName() { 1966 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME); 1967 } 1968 1969 /** 1970 * Set the name of the queue to which this job should be submitted. 1971 * 1972 * @param queueName Name of the queue 1973 */ 1974 public void setQueueName(String queueName) { 1975 set(JobContext.QUEUE_NAME, queueName); 1976 } 1977 1978 /** 1979 * Normalize the negative values in configuration 1980 * 1981 * @param val 1982 * @return normalized value 1983 */ 1984 public static long normalizeMemoryConfigValue(long val) { 1985 if (val < 0) { 1986 val = DISABLED_MEMORY_LIMIT; 1987 } 1988 return val; 1989 } 1990 1991 /** 1992 * Find a jar that contains a class of the same name, if any. 1993 * It will return a jar file, even if that is not the first thing 1994 * on the class path that has a class with the same name. 1995 * 1996 * @param my_class the class to find. 1997 * @return a jar file that contains the class, or null. 1998 */ 1999 public static String findContainingJar(Class my_class) { 2000 return ClassUtil.findContainingJar(my_class); 2001 } 2002 2003 /** 2004 * Get the memory required to run a task of this job, in bytes. See 2005 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 2006 * <p> 2007 * This method is deprecated. Now, different memory limits can be 2008 * set for map and reduce tasks of a job, in MB. 2009 * <p> 2010 * For backward compatibility, if the job configuration sets the 2011 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY}, that value is returned. 2012 * Otherwise, this method will return the larger of the values returned by 2013 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()} 2014 * after converting them into bytes. 2015 * 2016 * @return Memory required to run a task of this job, in bytes. 2017 * @see #setMaxVirtualMemoryForTask(long) 2018 * @deprecated Use {@link #getMemoryForMapTask()} and 2019 * {@link #getMemoryForReduceTask()} 2020 */ 2021 @Deprecated 2022 public long getMaxVirtualMemoryForTask() { 2023 LOG.warn( 2024 "getMaxVirtualMemoryForTask() is deprecated. " + 2025 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()"); 2026 2027 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 2028 Math.max(getMemoryForMapTask(), getMemoryForReduceTask()) * 1024 * 1024); 2029 return value; 2030 } 2031 2032 /** 2033 * Set the maximum amount of memory any task of this job can use. See 2034 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 2035 * <p> 2036 * mapred.task.maxvmem is split into 2037 * mapreduce.map.memory.mb 2038 * and mapreduce.map.memory.mb,mapred 2039 * each of the new key are set 2040 * as mapred.task.maxvmem / 1024 2041 * as new values are in MB 2042 * 2043 * @param vmem Maximum amount of virtual memory in bytes any task of this job 2044 * can use. 2045 * @see #getMaxVirtualMemoryForTask() 2046 * @deprecated 2047 * Use {@link #setMemoryForMapTask(long mem)} and 2048 * Use {@link #setMemoryForReduceTask(long mem)} 2049 */ 2050 @Deprecated 2051 public void setMaxVirtualMemoryForTask(long vmem) { 2052 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+ 2053 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()"); 2054 if (vmem < 0) { 2055 throw new IllegalArgumentException("Task memory allocation may not be < 0"); 2056 } 2057 2058 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) { 2059 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb 2060 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb 2061 }else{ 2062 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem); 2063 } 2064 } 2065 2066 /** 2067 * @deprecated this variable is deprecated and nolonger in use. 2068 */ 2069 @Deprecated 2070 public long getMaxPhysicalMemoryForTask() { 2071 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated." 2072 + " Refer to the APIs getMemoryForMapTask() and" 2073 + " getMemoryForReduceTask() for details."); 2074 return -1; 2075 } 2076 2077 /* 2078 * @deprecated this 2079 */ 2080 @Deprecated 2081 public void setMaxPhysicalMemoryForTask(long mem) { 2082 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated." 2083 + " The value set is ignored. Refer to " 2084 + " setMemoryForMapTask() and setMemoryForReduceTask() for details."); 2085 } 2086 2087 static String deprecatedString(String key) { 2088 return "The variable " + key + " is no longer used."; 2089 } 2090 2091 private void checkAndWarnDeprecation() { 2092 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) { 2093 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) 2094 + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY 2095 + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY); 2096 } 2097 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) { 2098 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT)); 2099 } 2100 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) { 2101 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT)); 2102 } 2103 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) { 2104 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT)); 2105 } 2106 } 2107 2108 2109 /* For debugging. Dump configurations to system output as XML format. */ 2110 public static void main(String[] args) throws Exception { 2111 new JobConf(new Configuration()).writeXml(System.out); 2112 } 2113 2114} 2115