001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 022 import java.io.IOException; 023 import java.util.regex.Pattern; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceAudience.Private; 029 import org.apache.hadoop.classification.InterfaceStability; 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.fs.FileStatus; 032 import org.apache.hadoop.fs.FileSystem; 033 import org.apache.hadoop.fs.Path; 034 import org.apache.hadoop.io.LongWritable; 035 import org.apache.hadoop.io.RawComparator; 036 import org.apache.hadoop.io.Text; 037 import org.apache.hadoop.io.WritableComparable; 038 import org.apache.hadoop.io.WritableComparator; 039 import org.apache.hadoop.io.compress.CompressionCodec; 040 import org.apache.hadoop.mapred.lib.HashPartitioner; 041 import org.apache.hadoop.mapred.lib.IdentityMapper; 042 import org.apache.hadoop.mapred.lib.IdentityReducer; 043 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator; 044 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner; 045 import org.apache.hadoop.mapreduce.MRConfig; 046 import org.apache.hadoop.mapreduce.MRJobConfig; 047 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 048 import org.apache.hadoop.mapreduce.util.ConfigUtil; 049 import org.apache.hadoop.security.Credentials; 050 import org.apache.hadoop.util.ClassUtil; 051 import org.apache.hadoop.util.ReflectionUtils; 052 import org.apache.hadoop.util.Tool; 053 import org.apache.log4j.Level; 054 055 /** 056 * A map/reduce job configuration. 057 * 058 * <p><code>JobConf</code> is the primary interface for a user to describe a 059 * map-reduce job to the Hadoop framework for execution. The framework tries to 060 * faithfully execute the job as-is described by <code>JobConf</code>, however: 061 * <ol> 062 * <li> 063 * Some configuration parameters might have been marked as 064 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams"> 065 * final</a> by administrators and hence cannot be altered. 066 * </li> 067 * <li> 068 * While some job parameters are straight-forward to set 069 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 070 * with the rest of the framework and/or job-configuration and is relatively 071 * more complex for the user to control finely 072 * (e.g. {@link #setNumMapTasks(int)}). 073 * </li> 074 * </ol></p> 075 * 076 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 077 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 078 * {@link OutputFormat} implementations to be used etc. 079 * 080 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 081 * of the job such as <code>Comparator</code>s to be used, files to be put in 082 * the {@link DistributedCache}, whether or not intermediate and/or job outputs 083 * are to be compressed (and how), debugability via user-provided scripts 084 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}), 085 * for doing post-processing on task logs, task's stdout, stderr, syslog. 086 * and etc.</p> 087 * 088 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p> 089 * <p><blockquote><pre> 090 * // Create a new JobConf 091 * JobConf job = new JobConf(new Configuration(), MyJob.class); 092 * 093 * // Specify various job-specific parameters 094 * job.setJobName("myjob"); 095 * 096 * FileInputFormat.setInputPaths(job, new Path("in")); 097 * FileOutputFormat.setOutputPath(job, new Path("out")); 098 * 099 * job.setMapperClass(MyJob.MyMapper.class); 100 * job.setCombinerClass(MyJob.MyReducer.class); 101 * job.setReducerClass(MyJob.MyReducer.class); 102 * 103 * job.setInputFormat(SequenceFileInputFormat.class); 104 * job.setOutputFormat(SequenceFileOutputFormat.class); 105 * </pre></blockquote></p> 106 * 107 * @see JobClient 108 * @see ClusterStatus 109 * @see Tool 110 * @see DistributedCache 111 */ 112 @InterfaceAudience.Public 113 @InterfaceStability.Stable 114 public class JobConf extends Configuration { 115 116 private static final Log LOG = LogFactory.getLog(JobConf.class); 117 118 static{ 119 ConfigUtil.loadResources(); 120 } 121 122 /** 123 * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and 124 * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY} 125 */ 126 @Deprecated 127 public static final String MAPRED_TASK_MAXVMEM_PROPERTY = 128 "mapred.task.maxvmem"; 129 130 /** 131 * @deprecated 132 */ 133 @Deprecated 134 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY = 135 "mapred.task.limit.maxvmem"; 136 137 /** 138 * @deprecated 139 */ 140 @Deprecated 141 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY = 142 "mapred.task.default.maxvmem"; 143 144 /** 145 * @deprecated 146 */ 147 @Deprecated 148 public static final String MAPRED_TASK_MAXPMEM_PROPERTY = 149 "mapred.task.maxpmem"; 150 151 /** 152 * A value which if set for memory related configuration options, 153 * indicates that the options are turned off. 154 */ 155 public static final long DISABLED_MEMORY_LIMIT = -1L; 156 157 /** 158 * Property name for the configuration property mapreduce.cluster.local.dir 159 */ 160 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR; 161 162 /** 163 * Name of the queue to which jobs will be submitted, if no queue 164 * name is mentioned. 165 */ 166 public static final String DEFAULT_QUEUE_NAME = "default"; 167 168 static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY = 169 JobContext.MAP_MEMORY_MB; 170 171 static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY = 172 JobContext.REDUCE_MEMORY_MB; 173 174 /** 175 * The variable is kept for M/R 1.x applications, while M/R 2.x applications 176 * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} 177 */ 178 @Deprecated 179 public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY = 180 "mapred.job.map.memory.mb"; 181 182 /** 183 * The variable is kept for M/R 1.x applications, while M/R 2.x applications 184 * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY} 185 */ 186 @Deprecated 187 public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY = 188 "mapred.job.reduce.memory.mb"; 189 190 /** Pattern for the default unpacking behavior for job jars */ 191 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT = 192 Pattern.compile("(?:classes/|lib/).*"); 193 194 /** 195 * Configuration key to set the java command line options for the child 196 * map and reduce tasks. 197 * 198 * Java opts for the task tracker child processes. 199 * The following symbol, if present, will be interpolated: @taskid@. 200 * It is replaced by current TaskID. Any other occurrences of '@' will go 201 * unchanged. 202 * For example, to enable verbose gc logging to a file named for the taskid in 203 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 204 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 205 * 206 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 207 * other environment variables to the child processes. 208 * 209 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 210 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS} 211 */ 212 @Deprecated 213 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts"; 214 215 /** 216 * Configuration key to set the java command line options for the map tasks. 217 * 218 * Java opts for the task tracker child map processes. 219 * The following symbol, if present, will be interpolated: @taskid@. 220 * It is replaced by current TaskID. Any other occurrences of '@' will go 221 * unchanged. 222 * For example, to enable verbose gc logging to a file named for the taskid in 223 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 224 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 225 * 226 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 227 * other environment variables to the map processes. 228 */ 229 public static final String MAPRED_MAP_TASK_JAVA_OPTS = 230 JobContext.MAP_JAVA_OPTS; 231 232 /** 233 * Configuration key to set the java command line options for the reduce tasks. 234 * 235 * Java opts for the task tracker child reduce processes. 236 * The following symbol, if present, will be interpolated: @taskid@. 237 * It is replaced by current TaskID. Any other occurrences of '@' will go 238 * unchanged. 239 * For example, to enable verbose gc logging to a file named for the taskid in 240 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 241 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 242 * 243 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 244 * pass process environment variables to the reduce processes. 245 */ 246 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 247 JobContext.REDUCE_JAVA_OPTS; 248 249 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m"; 250 251 /** 252 * @deprecated 253 * Configuration key to set the maximum virtual memory available to the child 254 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no 255 * longer have any effect. 256 */ 257 @Deprecated 258 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit"; 259 260 /** 261 * @deprecated 262 * Configuration key to set the maximum virtual memory available to the 263 * map tasks (in kilo-bytes). This has been deprecated and will no 264 * longer have any effect. 265 */ 266 @Deprecated 267 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit"; 268 269 /** 270 * @deprecated 271 * Configuration key to set the maximum virtual memory available to the 272 * reduce tasks (in kilo-bytes). This has been deprecated and will no 273 * longer have any effect. 274 */ 275 @Deprecated 276 public static final String MAPRED_REDUCE_TASK_ULIMIT = 277 "mapreduce.reduce.ulimit"; 278 279 280 /** 281 * Configuration key to set the environment of the child map/reduce tasks. 282 * 283 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 284 * reference existing environment variables via <code>$key</code> on 285 * Linux or <code>%key%</code> on Windows. 286 * 287 * Example: 288 * <ul> 289 * <li> A=foo - This will set the env variable A to foo. </li> 290 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 291 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 292 * </ul> 293 * 294 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 295 * {@link #MAPRED_REDUCE_TASK_ENV} 296 */ 297 @Deprecated 298 public static final String MAPRED_TASK_ENV = "mapred.child.env"; 299 300 /** 301 * Configuration key to set the environment of the child map tasks. 302 * 303 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 304 * reference existing environment variables via <code>$key</code> on 305 * Linux or <code>%key%</code> on Windows. 306 * 307 * Example: 308 * <ul> 309 * <li> A=foo - This will set the env variable A to foo. </li> 310 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 311 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 312 * </ul> 313 */ 314 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV; 315 316 /** 317 * Configuration key to set the environment of the child reduce tasks. 318 * 319 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 320 * reference existing environment variables via <code>$key</code> on 321 * Linux or <code>%key%</code> on Windows. 322 * 323 * Example: 324 * <ul> 325 * <li> A=foo - This will set the env variable A to foo. </li> 326 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 327 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 328 * </ul> 329 */ 330 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV; 331 332 private Credentials credentials = new Credentials(); 333 334 /** 335 * Configuration key to set the logging {@link Level} for the map task. 336 * 337 * The allowed logging levels are: 338 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 339 */ 340 public static final String MAPRED_MAP_TASK_LOG_LEVEL = 341 JobContext.MAP_LOG_LEVEL; 342 343 /** 344 * Configuration key to set the logging {@link Level} for the reduce task. 345 * 346 * The allowed logging levels are: 347 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 348 */ 349 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 350 JobContext.REDUCE_LOG_LEVEL; 351 352 /** 353 * Default logging level for map/reduce tasks. 354 */ 355 public static final Level DEFAULT_LOG_LEVEL = Level.INFO; 356 357 /** 358 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 359 * use {@link MRJobConfig#WORKFLOW_ID} instead 360 */ 361 @Deprecated 362 public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID; 363 364 /** 365 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 366 * use {@link MRJobConfig#WORKFLOW_NAME} instead 367 */ 368 @Deprecated 369 public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME; 370 371 /** 372 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 373 * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead 374 */ 375 @Deprecated 376 public static final String WORKFLOW_NODE_NAME = 377 MRJobConfig.WORKFLOW_NODE_NAME; 378 379 /** 380 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 381 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead 382 */ 383 @Deprecated 384 public static final String WORKFLOW_ADJACENCY_PREFIX_STRING = 385 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING; 386 387 /** 388 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 389 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead 390 */ 391 @Deprecated 392 public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN = 393 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN; 394 395 /** 396 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 397 * use {@link MRJobConfig#WORKFLOW_TAGS} instead 398 */ 399 @Deprecated 400 public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS; 401 402 /** 403 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 404 * not use it 405 */ 406 @Deprecated 407 public static final String MAPREDUCE_RECOVER_JOB = 408 "mapreduce.job.restart.recover"; 409 410 /** 411 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 412 * not use it 413 */ 414 @Deprecated 415 public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true; 416 417 /** 418 * Construct a map/reduce job configuration. 419 */ 420 public JobConf() { 421 checkAndWarnDeprecation(); 422 } 423 424 /** 425 * Construct a map/reduce job configuration. 426 * 427 * @param exampleClass a class whose containing jar is used as the job's jar. 428 */ 429 public JobConf(Class exampleClass) { 430 setJarByClass(exampleClass); 431 checkAndWarnDeprecation(); 432 } 433 434 /** 435 * Construct a map/reduce job configuration. 436 * 437 * @param conf a Configuration whose settings will be inherited. 438 */ 439 public JobConf(Configuration conf) { 440 super(conf); 441 442 if (conf instanceof JobConf) { 443 JobConf that = (JobConf)conf; 444 credentials = that.credentials; 445 } 446 447 checkAndWarnDeprecation(); 448 } 449 450 451 /** Construct a map/reduce job configuration. 452 * 453 * @param conf a Configuration whose settings will be inherited. 454 * @param exampleClass a class whose containing jar is used as the job's jar. 455 */ 456 public JobConf(Configuration conf, Class exampleClass) { 457 this(conf); 458 setJarByClass(exampleClass); 459 } 460 461 462 /** Construct a map/reduce configuration. 463 * 464 * @param config a Configuration-format XML job description file. 465 */ 466 public JobConf(String config) { 467 this(new Path(config)); 468 } 469 470 /** Construct a map/reduce configuration. 471 * 472 * @param config a Configuration-format XML job description file. 473 */ 474 public JobConf(Path config) { 475 super(); 476 addResource(config); 477 checkAndWarnDeprecation(); 478 } 479 480 /** A new map/reduce configuration where the behavior of reading from the 481 * default resources can be turned off. 482 * <p/> 483 * If the parameter {@code loadDefaults} is false, the new instance 484 * will not load resources from the default files. 485 * 486 * @param loadDefaults specifies whether to load from the default files 487 */ 488 public JobConf(boolean loadDefaults) { 489 super(loadDefaults); 490 checkAndWarnDeprecation(); 491 } 492 493 /** 494 * Get credentials for the job. 495 * @return credentials for the job 496 */ 497 public Credentials getCredentials() { 498 return credentials; 499 } 500 501 @Private 502 public void setCredentials(Credentials credentials) { 503 this.credentials = credentials; 504 } 505 506 /** 507 * Get the user jar for the map-reduce job. 508 * 509 * @return the user jar for the map-reduce job. 510 */ 511 public String getJar() { return get(JobContext.JAR); } 512 513 /** 514 * Set the user jar for the map-reduce job. 515 * 516 * @param jar the user jar for the map-reduce job. 517 */ 518 public void setJar(String jar) { set(JobContext.JAR, jar); } 519 520 /** 521 * Get the pattern for jar contents to unpack on the tasktracker 522 */ 523 public Pattern getJarUnpackPattern() { 524 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT); 525 } 526 527 528 /** 529 * Set the job's jar file by finding an example class location. 530 * 531 * @param cls the example class. 532 */ 533 public void setJarByClass(Class cls) { 534 String jar = ClassUtil.findContainingJar(cls); 535 if (jar != null) { 536 setJar(jar); 537 } 538 } 539 540 public String[] getLocalDirs() throws IOException { 541 return getTrimmedStrings(MRConfig.LOCAL_DIR); 542 } 543 544 /** 545 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead. 546 */ 547 @Deprecated 548 public void deleteLocalFiles() throws IOException { 549 String[] localDirs = getLocalDirs(); 550 for (int i = 0; i < localDirs.length; i++) { 551 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true); 552 } 553 } 554 555 public void deleteLocalFiles(String subdir) throws IOException { 556 String[] localDirs = getLocalDirs(); 557 for (int i = 0; i < localDirs.length; i++) { 558 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true); 559 } 560 } 561 562 /** 563 * Constructs a local file name. Files are distributed among configured 564 * local directories. 565 */ 566 public Path getLocalPath(String pathString) throws IOException { 567 return getLocalPath(MRConfig.LOCAL_DIR, pathString); 568 } 569 570 /** 571 * Get the reported username for this job. 572 * 573 * @return the username 574 */ 575 public String getUser() { 576 return get(JobContext.USER_NAME); 577 } 578 579 /** 580 * Set the reported username for this job. 581 * 582 * @param user the username for this job. 583 */ 584 public void setUser(String user) { 585 set(JobContext.USER_NAME, user); 586 } 587 588 589 590 /** 591 * Set whether the framework should keep the intermediate files for 592 * failed tasks. 593 * 594 * @param keep <code>true</code> if framework should keep the intermediate files 595 * for failed tasks, <code>false</code> otherwise. 596 * 597 */ 598 public void setKeepFailedTaskFiles(boolean keep) { 599 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep); 600 } 601 602 /** 603 * Should the temporary files for failed tasks be kept? 604 * 605 * @return should the files be kept? 606 */ 607 public boolean getKeepFailedTaskFiles() { 608 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false); 609 } 610 611 /** 612 * Set a regular expression for task names that should be kept. 613 * The regular expression ".*_m_000123_0" would keep the files 614 * for the first instance of map 123 that ran. 615 * 616 * @param pattern the java.util.regex.Pattern to match against the 617 * task names. 618 */ 619 public void setKeepTaskFilesPattern(String pattern) { 620 set(JobContext.PRESERVE_FILES_PATTERN, pattern); 621 } 622 623 /** 624 * Get the regular expression that is matched against the task names 625 * to see if we need to keep the files. 626 * 627 * @return the pattern as a string, if it was set, othewise null. 628 */ 629 public String getKeepTaskFilesPattern() { 630 return get(JobContext.PRESERVE_FILES_PATTERN); 631 } 632 633 /** 634 * Set the current working directory for the default file system. 635 * 636 * @param dir the new current working directory. 637 */ 638 public void setWorkingDirectory(Path dir) { 639 dir = new Path(getWorkingDirectory(), dir); 640 set(JobContext.WORKING_DIR, dir.toString()); 641 } 642 643 /** 644 * Get the current working directory for the default file system. 645 * 646 * @return the directory name. 647 */ 648 public Path getWorkingDirectory() { 649 String name = get(JobContext.WORKING_DIR); 650 if (name != null) { 651 return new Path(name); 652 } else { 653 try { 654 Path dir = FileSystem.get(this).getWorkingDirectory(); 655 set(JobContext.WORKING_DIR, dir.toString()); 656 return dir; 657 } catch (IOException e) { 658 throw new RuntimeException(e); 659 } 660 } 661 } 662 663 /** 664 * Sets the number of tasks that a spawned task JVM should run 665 * before it exits 666 * @param numTasks the number of tasks to execute; defaults to 1; 667 * -1 signifies no limit 668 */ 669 public void setNumTasksToExecutePerJvm(int numTasks) { 670 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks); 671 } 672 673 /** 674 * Get the number of tasks that a spawned JVM should execute 675 */ 676 public int getNumTasksToExecutePerJvm() { 677 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1); 678 } 679 680 /** 681 * Get the {@link InputFormat} implementation for the map-reduce job, 682 * defaults to {@link TextInputFormat} if not specified explicity. 683 * 684 * @return the {@link InputFormat} implementation for the map-reduce job. 685 */ 686 public InputFormat getInputFormat() { 687 return ReflectionUtils.newInstance(getClass("mapred.input.format.class", 688 TextInputFormat.class, 689 InputFormat.class), 690 this); 691 } 692 693 /** 694 * Set the {@link InputFormat} implementation for the map-reduce job. 695 * 696 * @param theClass the {@link InputFormat} implementation for the map-reduce 697 * job. 698 */ 699 public void setInputFormat(Class<? extends InputFormat> theClass) { 700 setClass("mapred.input.format.class", theClass, InputFormat.class); 701 } 702 703 /** 704 * Get the {@link OutputFormat} implementation for the map-reduce job, 705 * defaults to {@link TextOutputFormat} if not specified explicity. 706 * 707 * @return the {@link OutputFormat} implementation for the map-reduce job. 708 */ 709 public OutputFormat getOutputFormat() { 710 return ReflectionUtils.newInstance(getClass("mapred.output.format.class", 711 TextOutputFormat.class, 712 OutputFormat.class), 713 this); 714 } 715 716 /** 717 * Get the {@link OutputCommitter} implementation for the map-reduce job, 718 * defaults to {@link FileOutputCommitter} if not specified explicitly. 719 * 720 * @return the {@link OutputCommitter} implementation for the map-reduce job. 721 */ 722 public OutputCommitter getOutputCommitter() { 723 return (OutputCommitter)ReflectionUtils.newInstance( 724 getClass("mapred.output.committer.class", FileOutputCommitter.class, 725 OutputCommitter.class), this); 726 } 727 728 /** 729 * Set the {@link OutputCommitter} implementation for the map-reduce job. 730 * 731 * @param theClass the {@link OutputCommitter} implementation for the map-reduce 732 * job. 733 */ 734 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) { 735 setClass("mapred.output.committer.class", theClass, OutputCommitter.class); 736 } 737 738 /** 739 * Set the {@link OutputFormat} implementation for the map-reduce job. 740 * 741 * @param theClass the {@link OutputFormat} implementation for the map-reduce 742 * job. 743 */ 744 public void setOutputFormat(Class<? extends OutputFormat> theClass) { 745 setClass("mapred.output.format.class", theClass, OutputFormat.class); 746 } 747 748 /** 749 * Should the map outputs be compressed before transfer? 750 * 751 * @param compress should the map outputs be compressed? 752 */ 753 public void setCompressMapOutput(boolean compress) { 754 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress); 755 } 756 757 /** 758 * Are the outputs of the maps be compressed? 759 * 760 * @return <code>true</code> if the outputs of the maps are to be compressed, 761 * <code>false</code> otherwise. 762 */ 763 public boolean getCompressMapOutput() { 764 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false); 765 } 766 767 /** 768 * Set the given class as the {@link CompressionCodec} for the map outputs. 769 * 770 * @param codecClass the {@link CompressionCodec} class that will compress 771 * the map outputs. 772 */ 773 public void 774 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) { 775 setCompressMapOutput(true); 776 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 777 CompressionCodec.class); 778 } 779 780 /** 781 * Get the {@link CompressionCodec} for compressing the map outputs. 782 * 783 * @param defaultValue the {@link CompressionCodec} to return if not set 784 * @return the {@link CompressionCodec} class that should be used to compress the 785 * map outputs. 786 * @throws IllegalArgumentException if the class was specified, but not found 787 */ 788 public Class<? extends CompressionCodec> 789 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) { 790 Class<? extends CompressionCodec> codecClass = defaultValue; 791 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC); 792 if (name != null) { 793 try { 794 codecClass = getClassByName(name).asSubclass(CompressionCodec.class); 795 } catch (ClassNotFoundException e) { 796 throw new IllegalArgumentException("Compression codec " + name + 797 " was not found.", e); 798 } 799 } 800 return codecClass; 801 } 802 803 /** 804 * Get the key class for the map output data. If it is not set, use the 805 * (final) output key class. This allows the map output key class to be 806 * different than the final output key class. 807 * 808 * @return the map output key class. 809 */ 810 public Class<?> getMapOutputKeyClass() { 811 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class); 812 if (retv == null) { 813 retv = getOutputKeyClass(); 814 } 815 return retv; 816 } 817 818 /** 819 * Set the key class for the map output data. This allows the user to 820 * specify the map output key class to be different than the final output 821 * value class. 822 * 823 * @param theClass the map output key class. 824 */ 825 public void setMapOutputKeyClass(Class<?> theClass) { 826 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class); 827 } 828 829 /** 830 * Get the value class for the map output data. If it is not set, use the 831 * (final) output value class This allows the map output value class to be 832 * different than the final output value class. 833 * 834 * @return the map output value class. 835 */ 836 public Class<?> getMapOutputValueClass() { 837 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null, 838 Object.class); 839 if (retv == null) { 840 retv = getOutputValueClass(); 841 } 842 return retv; 843 } 844 845 /** 846 * Set the value class for the map output data. This allows the user to 847 * specify the map output value class to be different than the final output 848 * value class. 849 * 850 * @param theClass the map output value class. 851 */ 852 public void setMapOutputValueClass(Class<?> theClass) { 853 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class); 854 } 855 856 /** 857 * Get the key class for the job output data. 858 * 859 * @return the key class for the job output data. 860 */ 861 public Class<?> getOutputKeyClass() { 862 return getClass(JobContext.OUTPUT_KEY_CLASS, 863 LongWritable.class, Object.class); 864 } 865 866 /** 867 * Set the key class for the job output data. 868 * 869 * @param theClass the key class for the job output data. 870 */ 871 public void setOutputKeyClass(Class<?> theClass) { 872 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class); 873 } 874 875 /** 876 * Get the {@link RawComparator} comparator used to compare keys. 877 * 878 * @return the {@link RawComparator} comparator used to compare keys. 879 */ 880 public RawComparator getOutputKeyComparator() { 881 Class<? extends RawComparator> theClass = getClass( 882 JobContext.KEY_COMPARATOR, null, RawComparator.class); 883 if (theClass != null) 884 return ReflectionUtils.newInstance(theClass, this); 885 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this); 886 } 887 888 /** 889 * Set the {@link RawComparator} comparator used to compare keys. 890 * 891 * @param theClass the {@link RawComparator} comparator used to 892 * compare keys. 893 * @see #setOutputValueGroupingComparator(Class) 894 */ 895 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) { 896 setClass(JobContext.KEY_COMPARATOR, 897 theClass, RawComparator.class); 898 } 899 900 /** 901 * Set the {@link KeyFieldBasedComparator} options used to compare keys. 902 * 903 * @param keySpec the key specification of the form -k pos1[,pos2], where, 904 * pos is of the form f[.c][opts], where f is the number 905 * of the key field to use, and c is the number of the first character from 906 * the beginning of the field. Fields and character posns are numbered 907 * starting with 1; a character position of zero in pos2 indicates the 908 * field's last character. If '.c' is omitted from pos1, it defaults to 1 909 * (the beginning of the field); if omitted from pos2, it defaults to 0 910 * (the end of the field). opts are ordering options. The supported options 911 * are: 912 * -n, (Sort numerically) 913 * -r, (Reverse the result of comparison) 914 */ 915 public void setKeyFieldComparatorOptions(String keySpec) { 916 setOutputKeyComparatorClass(KeyFieldBasedComparator.class); 917 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec); 918 } 919 920 /** 921 * Get the {@link KeyFieldBasedComparator} options 922 */ 923 public String getKeyFieldComparatorOption() { 924 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS); 925 } 926 927 /** 928 * Set the {@link KeyFieldBasedPartitioner} options used for 929 * {@link Partitioner} 930 * 931 * @param keySpec the key specification of the form -k pos1[,pos2], where, 932 * pos is of the form f[.c][opts], where f is the number 933 * of the key field to use, and c is the number of the first character from 934 * the beginning of the field. Fields and character posns are numbered 935 * starting with 1; a character position of zero in pos2 indicates the 936 * field's last character. If '.c' is omitted from pos1, it defaults to 1 937 * (the beginning of the field); if omitted from pos2, it defaults to 0 938 * (the end of the field). 939 */ 940 public void setKeyFieldPartitionerOptions(String keySpec) { 941 setPartitionerClass(KeyFieldBasedPartitioner.class); 942 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec); 943 } 944 945 /** 946 * Get the {@link KeyFieldBasedPartitioner} options 947 */ 948 public String getKeyFieldPartitionerOption() { 949 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS); 950 } 951 952 /** 953 * Get the user defined {@link WritableComparable} comparator for 954 * grouping keys of inputs to the combiner. 955 * 956 * @return comparator set by the user for grouping values. 957 * @see #setCombinerKeyGroupingComparator(Class) for details. 958 */ 959 public RawComparator getCombinerKeyGroupingComparator() { 960 Class<? extends RawComparator> theClass = getClass( 961 JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class); 962 if (theClass == null) { 963 return getOutputKeyComparator(); 964 } 965 966 return ReflectionUtils.newInstance(theClass, this); 967 } 968 969 /** 970 * Get the user defined {@link WritableComparable} comparator for 971 * grouping keys of inputs to the reduce. 972 * 973 * @return comparator set by the user for grouping values. 974 * @see #setOutputValueGroupingComparator(Class) for details. 975 */ 976 public RawComparator getOutputValueGroupingComparator() { 977 Class<? extends RawComparator> theClass = getClass( 978 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class); 979 if (theClass == null) { 980 return getOutputKeyComparator(); 981 } 982 983 return ReflectionUtils.newInstance(theClass, this); 984 } 985 986 /** 987 * Set the user defined {@link RawComparator} comparator for 988 * grouping keys in the input to the combiner. 989 * <p/> 990 * <p>This comparator should be provided if the equivalence rules for keys 991 * for sorting the intermediates are different from those for grouping keys 992 * before each call to 993 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 994 * <p/> 995 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 996 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 997 * <p/> 998 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 999 * how keys are sorted, this can be used in conjunction to simulate 1000 * <i>secondary sort on values</i>.</p> 1001 * <p/> 1002 * <p><i>Note</i>: This is not a guarantee of the combiner sort being 1003 * <i>stable</i> in any sense. (In any case, with the order of available 1004 * map-outputs to the combiner being non-deterministic, it wouldn't make 1005 * that much sense.)</p> 1006 * 1007 * @param theClass the comparator class to be used for grouping keys for the 1008 * combiner. It should implement <code>RawComparator</code>. 1009 * @see #setOutputKeyComparatorClass(Class) 1010 */ 1011 public void setCombinerKeyGroupingComparator( 1012 Class<? extends RawComparator> theClass) { 1013 setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS, 1014 theClass, RawComparator.class); 1015 } 1016 1017 /** 1018 * Set the user defined {@link RawComparator} comparator for 1019 * grouping keys in the input to the reduce. 1020 * 1021 * <p>This comparator should be provided if the equivalence rules for keys 1022 * for sorting the intermediates are different from those for grouping keys 1023 * before each call to 1024 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 1025 * 1026 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 1027 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 1028 * 1029 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 1030 * how keys are sorted, this can be used in conjunction to simulate 1031 * <i>secondary sort on values</i>.</p> 1032 * 1033 * <p><i>Note</i>: This is not a guarantee of the reduce sort being 1034 * <i>stable</i> in any sense. (In any case, with the order of available 1035 * map-outputs to the reduce being non-deterministic, it wouldn't make 1036 * that much sense.)</p> 1037 * 1038 * @param theClass the comparator class to be used for grouping keys. 1039 * It should implement <code>RawComparator</code>. 1040 * @see #setOutputKeyComparatorClass(Class) 1041 * @see #setCombinerKeyGroupingComparator(Class) 1042 */ 1043 public void setOutputValueGroupingComparator( 1044 Class<? extends RawComparator> theClass) { 1045 setClass(JobContext.GROUP_COMPARATOR_CLASS, 1046 theClass, RawComparator.class); 1047 } 1048 1049 /** 1050 * Should the framework use the new context-object code for running 1051 * the mapper? 1052 * @return true, if the new api should be used 1053 */ 1054 public boolean getUseNewMapper() { 1055 return getBoolean("mapred.mapper.new-api", false); 1056 } 1057 /** 1058 * Set whether the framework should use the new api for the mapper. 1059 * This is the default for jobs submitted with the new Job api. 1060 * @param flag true, if the new api should be used 1061 */ 1062 public void setUseNewMapper(boolean flag) { 1063 setBoolean("mapred.mapper.new-api", flag); 1064 } 1065 1066 /** 1067 * Should the framework use the new context-object code for running 1068 * the reducer? 1069 * @return true, if the new api should be used 1070 */ 1071 public boolean getUseNewReducer() { 1072 return getBoolean("mapred.reducer.new-api", false); 1073 } 1074 /** 1075 * Set whether the framework should use the new api for the reducer. 1076 * This is the default for jobs submitted with the new Job api. 1077 * @param flag true, if the new api should be used 1078 */ 1079 public void setUseNewReducer(boolean flag) { 1080 setBoolean("mapred.reducer.new-api", flag); 1081 } 1082 1083 /** 1084 * Get the value class for job outputs. 1085 * 1086 * @return the value class for job outputs. 1087 */ 1088 public Class<?> getOutputValueClass() { 1089 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class); 1090 } 1091 1092 /** 1093 * Set the value class for job outputs. 1094 * 1095 * @param theClass the value class for job outputs. 1096 */ 1097 public void setOutputValueClass(Class<?> theClass) { 1098 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class); 1099 } 1100 1101 /** 1102 * Get the {@link Mapper} class for the job. 1103 * 1104 * @return the {@link Mapper} class for the job. 1105 */ 1106 public Class<? extends Mapper> getMapperClass() { 1107 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class); 1108 } 1109 1110 /** 1111 * Set the {@link Mapper} class for the job. 1112 * 1113 * @param theClass the {@link Mapper} class for the job. 1114 */ 1115 public void setMapperClass(Class<? extends Mapper> theClass) { 1116 setClass("mapred.mapper.class", theClass, Mapper.class); 1117 } 1118 1119 /** 1120 * Get the {@link MapRunnable} class for the job. 1121 * 1122 * @return the {@link MapRunnable} class for the job. 1123 */ 1124 public Class<? extends MapRunnable> getMapRunnerClass() { 1125 return getClass("mapred.map.runner.class", 1126 MapRunner.class, MapRunnable.class); 1127 } 1128 1129 /** 1130 * Expert: Set the {@link MapRunnable} class for the job. 1131 * 1132 * Typically used to exert greater control on {@link Mapper}s. 1133 * 1134 * @param theClass the {@link MapRunnable} class for the job. 1135 */ 1136 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) { 1137 setClass("mapred.map.runner.class", theClass, MapRunnable.class); 1138 } 1139 1140 /** 1141 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 1142 * to be sent to the {@link Reducer}s. 1143 * 1144 * @return the {@link Partitioner} used to partition map-outputs. 1145 */ 1146 public Class<? extends Partitioner> getPartitionerClass() { 1147 return getClass("mapred.partitioner.class", 1148 HashPartitioner.class, Partitioner.class); 1149 } 1150 1151 /** 1152 * Set the {@link Partitioner} class used to partition 1153 * {@link Mapper}-outputs to be sent to the {@link Reducer}s. 1154 * 1155 * @param theClass the {@link Partitioner} used to partition map-outputs. 1156 */ 1157 public void setPartitionerClass(Class<? extends Partitioner> theClass) { 1158 setClass("mapred.partitioner.class", theClass, Partitioner.class); 1159 } 1160 1161 /** 1162 * Get the {@link Reducer} class for the job. 1163 * 1164 * @return the {@link Reducer} class for the job. 1165 */ 1166 public Class<? extends Reducer> getReducerClass() { 1167 return getClass("mapred.reducer.class", 1168 IdentityReducer.class, Reducer.class); 1169 } 1170 1171 /** 1172 * Set the {@link Reducer} class for the job. 1173 * 1174 * @param theClass the {@link Reducer} class for the job. 1175 */ 1176 public void setReducerClass(Class<? extends Reducer> theClass) { 1177 setClass("mapred.reducer.class", theClass, Reducer.class); 1178 } 1179 1180 /** 1181 * Get the user-defined <i>combiner</i> class used to combine map-outputs 1182 * before being sent to the reducers. Typically the combiner is same as the 1183 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}. 1184 * 1185 * @return the user-defined combiner class used to combine map-outputs. 1186 */ 1187 public Class<? extends Reducer> getCombinerClass() { 1188 return getClass("mapred.combiner.class", null, Reducer.class); 1189 } 1190 1191 /** 1192 * Set the user-defined <i>combiner</i> class used to combine map-outputs 1193 * before being sent to the reducers. 1194 * 1195 * <p>The combiner is an application-specified aggregation operation, which 1196 * can help cut down the amount of data transferred between the 1197 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p> 1198 * 1199 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both 1200 * the mapper and reducer tasks. In general, the combiner is called as the 1201 * sort/merge result is written to disk. The combiner must: 1202 * <ul> 1203 * <li> be side-effect free</li> 1204 * <li> have the same input and output key types and the same input and 1205 * output value types</li> 1206 * </ul></p> 1207 * 1208 * <p>Typically the combiner is same as the <code>Reducer</code> for the 1209 * job i.e. {@link #setReducerClass(Class)}.</p> 1210 * 1211 * @param theClass the user-defined combiner class used to combine 1212 * map-outputs. 1213 */ 1214 public void setCombinerClass(Class<? extends Reducer> theClass) { 1215 setClass("mapred.combiner.class", theClass, Reducer.class); 1216 } 1217 1218 /** 1219 * Should speculative execution be used for this job? 1220 * Defaults to <code>true</code>. 1221 * 1222 * @return <code>true</code> if speculative execution be used for this job, 1223 * <code>false</code> otherwise. 1224 */ 1225 public boolean getSpeculativeExecution() { 1226 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution()); 1227 } 1228 1229 /** 1230 * Turn speculative execution on or off for this job. 1231 * 1232 * @param speculativeExecution <code>true</code> if speculative execution 1233 * should be turned on, else <code>false</code>. 1234 */ 1235 public void setSpeculativeExecution(boolean speculativeExecution) { 1236 setMapSpeculativeExecution(speculativeExecution); 1237 setReduceSpeculativeExecution(speculativeExecution); 1238 } 1239 1240 /** 1241 * Should speculative execution be used for this job for map tasks? 1242 * Defaults to <code>true</code>. 1243 * 1244 * @return <code>true</code> if speculative execution be 1245 * used for this job for map tasks, 1246 * <code>false</code> otherwise. 1247 */ 1248 public boolean getMapSpeculativeExecution() { 1249 return getBoolean(JobContext.MAP_SPECULATIVE, true); 1250 } 1251 1252 /** 1253 * Turn speculative execution on or off for this job for map tasks. 1254 * 1255 * @param speculativeExecution <code>true</code> if speculative execution 1256 * should be turned on for map tasks, 1257 * else <code>false</code>. 1258 */ 1259 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1260 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution); 1261 } 1262 1263 /** 1264 * Should speculative execution be used for this job for reduce tasks? 1265 * Defaults to <code>true</code>. 1266 * 1267 * @return <code>true</code> if speculative execution be used 1268 * for reduce tasks for this job, 1269 * <code>false</code> otherwise. 1270 */ 1271 public boolean getReduceSpeculativeExecution() { 1272 return getBoolean(JobContext.REDUCE_SPECULATIVE, true); 1273 } 1274 1275 /** 1276 * Turn speculative execution on or off for this job for reduce tasks. 1277 * 1278 * @param speculativeExecution <code>true</code> if speculative execution 1279 * should be turned on for reduce tasks, 1280 * else <code>false</code>. 1281 */ 1282 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1283 setBoolean(JobContext.REDUCE_SPECULATIVE, 1284 speculativeExecution); 1285 } 1286 1287 /** 1288 * Get configured the number of reduce tasks for this job. 1289 * Defaults to <code>1</code>. 1290 * 1291 * @return the number of reduce tasks for this job. 1292 */ 1293 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); } 1294 1295 /** 1296 * Set the number of map tasks for this job. 1297 * 1298 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 1299 * number of spawned map tasks depends on the number of {@link InputSplit}s 1300 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}. 1301 * 1302 * A custom {@link InputFormat} is typically used to accurately control 1303 * the number of map tasks for the job.</p> 1304 * 1305 * <h4 id="NoOfMaps">How many maps?</h4> 1306 * 1307 * <p>The number of maps is usually driven by the total size of the inputs 1308 * i.e. total number of blocks of the input files.</p> 1309 * 1310 * <p>The right level of parallelism for maps seems to be around 10-100 maps 1311 * per-node, although it has been set up to 300 or so for very cpu-light map 1312 * tasks. Task setup takes awhile, so it is best if the maps take at least a 1313 * minute to execute.</p> 1314 * 1315 * <p>The default behavior of file-based {@link InputFormat}s is to split the 1316 * input into <i>logical</i> {@link InputSplit}s based on the total size, in 1317 * bytes, of input files. However, the {@link FileSystem} blocksize of the 1318 * input files is treated as an upper bound for input splits. A lower bound 1319 * on the split size can be set via 1320 * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize"> 1321 * mapreduce.input.fileinputformat.split.minsize</a>.</p> 1322 * 1323 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 1324 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 1325 * used to set it even higher.</p> 1326 * 1327 * @param n the number of map tasks for this job. 1328 * @see InputFormat#getSplits(JobConf, int) 1329 * @see FileInputFormat 1330 * @see FileSystem#getDefaultBlockSize() 1331 * @see FileStatus#getBlockSize() 1332 */ 1333 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); } 1334 1335 /** 1336 * Get configured the number of reduce tasks for this job. Defaults to 1337 * <code>1</code>. 1338 * 1339 * @return the number of reduce tasks for this job. 1340 */ 1341 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); } 1342 1343 /** 1344 * Set the requisite number of reduce tasks for this job. 1345 * 1346 * <h4 id="NoOfReduces">How many reduces?</h4> 1347 * 1348 * <p>The right number of reduces seems to be <code>0.95</code> or 1349 * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> * 1350 * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum"> 1351 * mapreduce.tasktracker.reduce.tasks.maximum</a>). 1352 * </p> 1353 * 1354 * <p>With <code>0.95</code> all of the reduces can launch immediately and 1355 * start transfering map outputs as the maps finish. With <code>1.75</code> 1356 * the faster nodes will finish their first round of reduces and launch a 1357 * second wave of reduces doing a much better job of load balancing.</p> 1358 * 1359 * <p>Increasing the number of reduces increases the framework overhead, but 1360 * increases load balancing and lowers the cost of failures.</p> 1361 * 1362 * <p>The scaling factors above are slightly less than whole numbers to 1363 * reserve a few reduce slots in the framework for speculative-tasks, failures 1364 * etc.</p> 1365 * 1366 * <h4 id="ReducerNone">Reducer NONE</h4> 1367 * 1368 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p> 1369 * 1370 * <p>In this case the output of the map-tasks directly go to distributed 1371 * file-system, to the path set by 1372 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 1373 * framework doesn't sort the map-outputs before writing it out to HDFS.</p> 1374 * 1375 * @param n the number of reduce tasks for this job. 1376 */ 1377 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); } 1378 1379 /** 1380 * Get the configured number of maximum attempts that will be made to run a 1381 * map task, as specified by the <code>mapreduce.map.maxattempts</code> 1382 * property. If this property is not already set, the default is 4 attempts. 1383 * 1384 * @return the max number of attempts per map task. 1385 */ 1386 public int getMaxMapAttempts() { 1387 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4); 1388 } 1389 1390 /** 1391 * Expert: Set the number of maximum attempts that will be made to run a 1392 * map task. 1393 * 1394 * @param n the number of attempts per map task. 1395 */ 1396 public void setMaxMapAttempts(int n) { 1397 setInt(JobContext.MAP_MAX_ATTEMPTS, n); 1398 } 1399 1400 /** 1401 * Get the configured number of maximum attempts that will be made to run a 1402 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code> 1403 * property. If this property is not already set, the default is 4 attempts. 1404 * 1405 * @return the max number of attempts per reduce task. 1406 */ 1407 public int getMaxReduceAttempts() { 1408 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4); 1409 } 1410 /** 1411 * Expert: Set the number of maximum attempts that will be made to run a 1412 * reduce task. 1413 * 1414 * @param n the number of attempts per reduce task. 1415 */ 1416 public void setMaxReduceAttempts(int n) { 1417 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n); 1418 } 1419 1420 /** 1421 * Get the user-specified job name. This is only used to identify the 1422 * job to the user. 1423 * 1424 * @return the job's name, defaulting to "". 1425 */ 1426 public String getJobName() { 1427 return get(JobContext.JOB_NAME, ""); 1428 } 1429 1430 /** 1431 * Set the user-specified job name. 1432 * 1433 * @param name the job's new name. 1434 */ 1435 public void setJobName(String name) { 1436 set(JobContext.JOB_NAME, name); 1437 } 1438 1439 /** 1440 * Get the user-specified session identifier. The default is the empty string. 1441 * 1442 * The session identifier is used to tag metric data that is reported to some 1443 * performance metrics system via the org.apache.hadoop.metrics API. The 1444 * session identifier is intended, in particular, for use by Hadoop-On-Demand 1445 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 1446 * HOD will set the session identifier by modifying the mapred-site.xml file 1447 * before starting the cluster. 1448 * 1449 * When not running under HOD, this identifer is expected to remain set to 1450 * the empty string. 1451 * 1452 * @return the session identifier, defaulting to "". 1453 */ 1454 @Deprecated 1455 public String getSessionId() { 1456 return get("session.id", ""); 1457 } 1458 1459 /** 1460 * Set the user-specified session identifier. 1461 * 1462 * @param sessionId the new session id. 1463 */ 1464 @Deprecated 1465 public void setSessionId(String sessionId) { 1466 set("session.id", sessionId); 1467 } 1468 1469 /** 1470 * Set the maximum no. of failures of a given job per tasktracker. 1471 * If the no. of task failures exceeds <code>noFailures</code>, the 1472 * tasktracker is <i>blacklisted</i> for this job. 1473 * 1474 * @param noFailures maximum no. of failures of a given job per tasktracker. 1475 */ 1476 public void setMaxTaskFailuresPerTracker(int noFailures) { 1477 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures); 1478 } 1479 1480 /** 1481 * Expert: Get the maximum no. of failures of a given job per tasktracker. 1482 * If the no. of task failures exceeds this, the tasktracker is 1483 * <i>blacklisted</i> for this job. 1484 * 1485 * @return the maximum no. of failures of a given job per tasktracker. 1486 */ 1487 public int getMaxTaskFailuresPerTracker() { 1488 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3); 1489 } 1490 1491 /** 1492 * Get the maximum percentage of map tasks that can fail without 1493 * the job being aborted. 1494 * 1495 * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 1496 * attempts before being declared as <i>failed</i>. 1497 * 1498 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in 1499 * the job being declared as {@link JobStatus#FAILED}. 1500 * 1501 * @return the maximum percentage of map tasks that can fail without 1502 * the job being aborted. 1503 */ 1504 public int getMaxMapTaskFailuresPercent() { 1505 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0); 1506 } 1507 1508 /** 1509 * Expert: Set the maximum percentage of map tasks that can fail without the 1510 * job being aborted. 1511 * 1512 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 1513 * before being declared as <i>failed</i>. 1514 * 1515 * @param percent the maximum percentage of map tasks that can fail without 1516 * the job being aborted. 1517 */ 1518 public void setMaxMapTaskFailuresPercent(int percent) { 1519 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent); 1520 } 1521 1522 /** 1523 * Get the maximum percentage of reduce tasks that can fail without 1524 * the job being aborted. 1525 * 1526 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1527 * attempts before being declared as <i>failed</i>. 1528 * 1529 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 1530 * in the job being declared as {@link JobStatus#FAILED}. 1531 * 1532 * @return the maximum percentage of reduce tasks that can fail without 1533 * the job being aborted. 1534 */ 1535 public int getMaxReduceTaskFailuresPercent() { 1536 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0); 1537 } 1538 1539 /** 1540 * Set the maximum percentage of reduce tasks that can fail without the job 1541 * being aborted. 1542 * 1543 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1544 * attempts before being declared as <i>failed</i>. 1545 * 1546 * @param percent the maximum percentage of reduce tasks that can fail without 1547 * the job being aborted. 1548 */ 1549 public void setMaxReduceTaskFailuresPercent(int percent) { 1550 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent); 1551 } 1552 1553 /** 1554 * Set {@link JobPriority} for this job. 1555 * 1556 * @param prio the {@link JobPriority} for this job. 1557 */ 1558 public void setJobPriority(JobPriority prio) { 1559 set(JobContext.PRIORITY, prio.toString()); 1560 } 1561 1562 /** 1563 * Get the {@link JobPriority} for this job. 1564 * 1565 * @return the {@link JobPriority} for this job. 1566 */ 1567 public JobPriority getJobPriority() { 1568 String prio = get(JobContext.PRIORITY); 1569 if(prio == null) { 1570 return JobPriority.NORMAL; 1571 } 1572 1573 return JobPriority.valueOf(prio); 1574 } 1575 1576 /** 1577 * Set JobSubmitHostName for this job. 1578 * 1579 * @param hostname the JobSubmitHostName for this job. 1580 */ 1581 void setJobSubmitHostName(String hostname) { 1582 set(MRJobConfig.JOB_SUBMITHOST, hostname); 1583 } 1584 1585 /** 1586 * Get the JobSubmitHostName for this job. 1587 * 1588 * @return the JobSubmitHostName for this job. 1589 */ 1590 String getJobSubmitHostName() { 1591 String hostname = get(MRJobConfig.JOB_SUBMITHOST); 1592 1593 return hostname; 1594 } 1595 1596 /** 1597 * Set JobSubmitHostAddress for this job. 1598 * 1599 * @param hostadd the JobSubmitHostAddress for this job. 1600 */ 1601 void setJobSubmitHostAddress(String hostadd) { 1602 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd); 1603 } 1604 1605 /** 1606 * Get JobSubmitHostAddress for this job. 1607 * 1608 * @return JobSubmitHostAddress for this job. 1609 */ 1610 String getJobSubmitHostAddress() { 1611 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR); 1612 1613 return hostadd; 1614 } 1615 1616 /** 1617 * Get whether the task profiling is enabled. 1618 * @return true if some tasks will be profiled 1619 */ 1620 public boolean getProfileEnabled() { 1621 return getBoolean(JobContext.TASK_PROFILE, false); 1622 } 1623 1624 /** 1625 * Set whether the system should collect profiler information for some of 1626 * the tasks in this job? The information is stored in the user log 1627 * directory. 1628 * @param newValue true means it should be gathered 1629 */ 1630 public void setProfileEnabled(boolean newValue) { 1631 setBoolean(JobContext.TASK_PROFILE, newValue); 1632 } 1633 1634 /** 1635 * Get the profiler configuration arguments. 1636 * 1637 * The default value for this property is 1638 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s" 1639 * 1640 * @return the parameters to pass to the task child to configure profiling 1641 */ 1642 public String getProfileParams() { 1643 return get(JobContext.TASK_PROFILE_PARAMS, 1644 "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," + 1645 "verbose=n,file=%s"); 1646 } 1647 1648 /** 1649 * Set the profiler configuration arguments. If the string contains a '%s' it 1650 * will be replaced with the name of the profiling output file when the task 1651 * runs. 1652 * 1653 * This value is passed to the task child JVM on the command line. 1654 * 1655 * @param value the configuration string 1656 */ 1657 public void setProfileParams(String value) { 1658 set(JobContext.TASK_PROFILE_PARAMS, value); 1659 } 1660 1661 /** 1662 * Get the range of maps or reduces to profile. 1663 * @param isMap is the task a map? 1664 * @return the task ranges 1665 */ 1666 public IntegerRanges getProfileTaskRange(boolean isMap) { 1667 return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 1668 JobContext.NUM_REDUCE_PROFILES), "0-2"); 1669 } 1670 1671 /** 1672 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1673 * must also be called. 1674 * @param newValue a set of integer ranges of the map ids 1675 */ 1676 public void setProfileTaskRange(boolean isMap, String newValue) { 1677 // parse the value to make sure it is legal 1678 new Configuration.IntegerRanges(newValue); 1679 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 1680 newValue); 1681 } 1682 1683 /** 1684 * Set the debug script to run when the map tasks fail. 1685 * 1686 * <p>The debug script can aid debugging of failed map tasks. The script is 1687 * given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1688 * 1689 * <p>The debug command, run on the node where the map failed, is:</p> 1690 * <p><pre><blockquote> 1691 * $script $stdout $stderr $syslog $jobconf. 1692 * </blockquote></pre></p> 1693 * 1694 * <p> The script file is distributed through {@link DistributedCache} 1695 * APIs. The script needs to be symlinked. </p> 1696 * 1697 * <p>Here is an example on how to submit a script 1698 * <p><blockquote><pre> 1699 * job.setMapDebugScript("./myscript"); 1700 * DistributedCache.createSymlink(job); 1701 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1702 * </pre></blockquote></p> 1703 * 1704 * @param mDbgScript the script name 1705 */ 1706 public void setMapDebugScript(String mDbgScript) { 1707 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript); 1708 } 1709 1710 /** 1711 * Get the map task's debug script. 1712 * 1713 * @return the debug Script for the mapred job for failed map tasks. 1714 * @see #setMapDebugScript(String) 1715 */ 1716 public String getMapDebugScript() { 1717 return get(JobContext.MAP_DEBUG_SCRIPT); 1718 } 1719 1720 /** 1721 * Set the debug script to run when the reduce tasks fail. 1722 * 1723 * <p>The debug script can aid debugging of failed reduce tasks. The script 1724 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1725 * 1726 * <p>The debug command, run on the node where the map failed, is:</p> 1727 * <p><pre><blockquote> 1728 * $script $stdout $stderr $syslog $jobconf. 1729 * </blockquote></pre></p> 1730 * 1731 * <p> The script file is distributed through {@link DistributedCache} 1732 * APIs. The script file needs to be symlinked </p> 1733 * 1734 * <p>Here is an example on how to submit a script 1735 * <p><blockquote><pre> 1736 * job.setReduceDebugScript("./myscript"); 1737 * DistributedCache.createSymlink(job); 1738 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1739 * </pre></blockquote></p> 1740 * 1741 * @param rDbgScript the script name 1742 */ 1743 public void setReduceDebugScript(String rDbgScript) { 1744 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript); 1745 } 1746 1747 /** 1748 * Get the reduce task's debug Script 1749 * 1750 * @return the debug script for the mapred job for failed reduce tasks. 1751 * @see #setReduceDebugScript(String) 1752 */ 1753 public String getReduceDebugScript() { 1754 return get(JobContext.REDUCE_DEBUG_SCRIPT); 1755 } 1756 1757 /** 1758 * Get the uri to be invoked in-order to send a notification after the job 1759 * has completed (success/failure). 1760 * 1761 * @return the job end notification uri, <code>null</code> if it hasn't 1762 * been set. 1763 * @see #setJobEndNotificationURI(String) 1764 */ 1765 public String getJobEndNotificationURI() { 1766 return get(JobContext.MR_JOB_END_NOTIFICATION_URL); 1767 } 1768 1769 /** 1770 * Set the uri to be invoked in-order to send a notification after the job 1771 * has completed (success/failure). 1772 * 1773 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 1774 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 1775 * identifier and completion-status respectively.</p> 1776 * 1777 * <p>This is typically used by application-writers to implement chaining of 1778 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p> 1779 * 1780 * @param uri the job end notification uri 1781 * @see JobStatus 1782 * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html# 1783 * JobCompletionAndChaining">Job Completion and Chaining</a> 1784 */ 1785 public void setJobEndNotificationURI(String uri) { 1786 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri); 1787 } 1788 1789 /** 1790 * Get job-specific shared directory for use as scratch space 1791 * 1792 * <p> 1793 * When a job starts, a shared directory is created at location 1794 * <code> 1795 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>. 1796 * This directory is exposed to the users through 1797 * <code>mapreduce.job.local.dir </code>. 1798 * So, the tasks can use this space 1799 * as scratch space and share files among them. </p> 1800 * This value is available as System property also. 1801 * 1802 * @return The localized job specific shared directory 1803 */ 1804 public String getJobLocalDir() { 1805 return get(JobContext.JOB_LOCAL_DIR); 1806 } 1807 1808 /** 1809 * Get memory required to run a map task of the job, in MB. 1810 * 1811 * If a value is specified in the configuration, it is returned. 1812 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}. 1813 * <p/> 1814 * For backward compatibility, if the job configuration sets the 1815 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1816 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1817 * after converting it from bytes to MB. 1818 * @return memory required to run a map task of the job, in MB, 1819 * or {@link #DISABLED_MEMORY_LIMIT} if unset. 1820 */ 1821 public long getMemoryForMapTask() { 1822 long value = getDeprecatedMemoryValue(); 1823 if (value == DISABLED_MEMORY_LIMIT) { 1824 value = normalizeMemoryConfigValue( 1825 getLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, 1826 DISABLED_MEMORY_LIMIT)); 1827 } 1828 // In case that M/R 1.x applications use the old property name 1829 if (value == DISABLED_MEMORY_LIMIT) { 1830 value = normalizeMemoryConfigValue( 1831 getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, 1832 DISABLED_MEMORY_LIMIT)); 1833 } 1834 return value; 1835 } 1836 1837 public void setMemoryForMapTask(long mem) { 1838 setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1839 // In case that M/R 1.x applications use the old property name 1840 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1841 } 1842 1843 /** 1844 * Get memory required to run a reduce task of the job, in MB. 1845 * 1846 * If a value is specified in the configuration, it is returned. 1847 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}. 1848 * <p/> 1849 * For backward compatibility, if the job configuration sets the 1850 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1851 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1852 * after converting it from bytes to MB. 1853 * @return memory required to run a reduce task of the job, in MB, 1854 * or {@link #DISABLED_MEMORY_LIMIT} if unset. 1855 */ 1856 public long getMemoryForReduceTask() { 1857 long value = getDeprecatedMemoryValue(); 1858 if (value == DISABLED_MEMORY_LIMIT) { 1859 value = normalizeMemoryConfigValue( 1860 getLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, 1861 DISABLED_MEMORY_LIMIT)); 1862 } 1863 // In case that M/R 1.x applications use the old property name 1864 if (value == DISABLED_MEMORY_LIMIT) { 1865 value = normalizeMemoryConfigValue( 1866 getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, 1867 DISABLED_MEMORY_LIMIT)); 1868 } 1869 return value; 1870 } 1871 1872 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY, 1873 // converted into MBs. 1874 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative 1875 // value. 1876 private long getDeprecatedMemoryValue() { 1877 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 1878 DISABLED_MEMORY_LIMIT); 1879 oldValue = normalizeMemoryConfigValue(oldValue); 1880 if (oldValue != DISABLED_MEMORY_LIMIT) { 1881 oldValue /= (1024*1024); 1882 } 1883 return oldValue; 1884 } 1885 1886 public void setMemoryForReduceTask(long mem) { 1887 setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1888 // In case that M/R 1.x applications use the old property name 1889 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1890 } 1891 1892 /** 1893 * Return the name of the queue to which this job is submitted. 1894 * Defaults to 'default'. 1895 * 1896 * @return name of the queue 1897 */ 1898 public String getQueueName() { 1899 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME); 1900 } 1901 1902 /** 1903 * Set the name of the queue to which this job should be submitted. 1904 * 1905 * @param queueName Name of the queue 1906 */ 1907 public void setQueueName(String queueName) { 1908 set(JobContext.QUEUE_NAME, queueName); 1909 } 1910 1911 /** 1912 * Normalize the negative values in configuration 1913 * 1914 * @param val 1915 * @return normalized value 1916 */ 1917 public static long normalizeMemoryConfigValue(long val) { 1918 if (val < 0) { 1919 val = DISABLED_MEMORY_LIMIT; 1920 } 1921 return val; 1922 } 1923 1924 /** 1925 * Compute the number of slots required to run a single map task-attempt 1926 * of this job. 1927 * @param slotSizePerMap cluster-wide value of the amount of memory required 1928 * to run a map-task 1929 * @return the number of slots required to run a single map task-attempt 1930 * 1 if memory parameters are disabled. 1931 */ 1932 int computeNumSlotsPerMap(long slotSizePerMap) { 1933 if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) || 1934 (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) { 1935 return 1; 1936 } 1937 return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap)); 1938 } 1939 1940 /** 1941 * Compute the number of slots required to run a single reduce task-attempt 1942 * of this job. 1943 * @param slotSizePerReduce cluster-wide value of the amount of memory 1944 * required to run a reduce-task 1945 * @return the number of slots required to run a single reduce task-attempt 1946 * 1 if memory parameters are disabled 1947 */ 1948 int computeNumSlotsPerReduce(long slotSizePerReduce) { 1949 if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) || 1950 (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) { 1951 return 1; 1952 } 1953 return 1954 (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce)); 1955 } 1956 1957 /** 1958 * Find a jar that contains a class of the same name, if any. 1959 * It will return a jar file, even if that is not the first thing 1960 * on the class path that has a class with the same name. 1961 * 1962 * @param my_class the class to find. 1963 * @return a jar file that contains the class, or null. 1964 * @throws IOException 1965 */ 1966 public static String findContainingJar(Class my_class) { 1967 return ClassUtil.findContainingJar(my_class); 1968 } 1969 1970 /** 1971 * Get the memory required to run a task of this job, in bytes. See 1972 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 1973 * <p/> 1974 * This method is deprecated. Now, different memory limits can be 1975 * set for map and reduce tasks of a job, in MB. 1976 * <p/> 1977 * For backward compatibility, if the job configuration sets the 1978 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1979 * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned. 1980 * Otherwise, this method will return the larger of the values returned by 1981 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()} 1982 * after converting them into bytes. 1983 * 1984 * @return Memory required to run a task of this job, in bytes, 1985 * or {@link #DISABLED_MEMORY_LIMIT}, if unset. 1986 * @see #setMaxVirtualMemoryForTask(long) 1987 * @deprecated Use {@link #getMemoryForMapTask()} and 1988 * {@link #getMemoryForReduceTask()} 1989 */ 1990 @Deprecated 1991 public long getMaxVirtualMemoryForTask() { 1992 LOG.warn( 1993 "getMaxVirtualMemoryForTask() is deprecated. " + 1994 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()"); 1995 1996 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT); 1997 value = normalizeMemoryConfigValue(value); 1998 if (value == DISABLED_MEMORY_LIMIT) { 1999 value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask()); 2000 value = normalizeMemoryConfigValue(value); 2001 if (value != DISABLED_MEMORY_LIMIT) { 2002 value *= 1024*1024; 2003 } 2004 } 2005 return value; 2006 } 2007 2008 /** 2009 * Set the maximum amount of memory any task of this job can use. See 2010 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 2011 * <p/> 2012 * mapred.task.maxvmem is split into 2013 * mapreduce.map.memory.mb 2014 * and mapreduce.map.memory.mb,mapred 2015 * each of the new key are set 2016 * as mapred.task.maxvmem / 1024 2017 * as new values are in MB 2018 * 2019 * @param vmem Maximum amount of virtual memory in bytes any task of this job 2020 * can use. 2021 * @see #getMaxVirtualMemoryForTask() 2022 * @deprecated 2023 * Use {@link #setMemoryForMapTask(long mem)} and 2024 * Use {@link #setMemoryForReduceTask(long mem)} 2025 */ 2026 @Deprecated 2027 public void setMaxVirtualMemoryForTask(long vmem) { 2028 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+ 2029 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()"); 2030 if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) { 2031 setMemoryForMapTask(DISABLED_MEMORY_LIMIT); 2032 setMemoryForReduceTask(DISABLED_MEMORY_LIMIT); 2033 } 2034 2035 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) { 2036 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb 2037 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb 2038 }else{ 2039 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem); 2040 } 2041 } 2042 2043 /** 2044 * @deprecated this variable is deprecated and nolonger in use. 2045 */ 2046 @Deprecated 2047 public long getMaxPhysicalMemoryForTask() { 2048 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated." 2049 + " Refer to the APIs getMemoryForMapTask() and" 2050 + " getMemoryForReduceTask() for details."); 2051 return -1; 2052 } 2053 2054 /* 2055 * @deprecated this 2056 */ 2057 @Deprecated 2058 public void setMaxPhysicalMemoryForTask(long mem) { 2059 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated." 2060 + " The value set is ignored. Refer to " 2061 + " setMemoryForMapTask() and setMemoryForReduceTask() for details."); 2062 } 2063 2064 static String deprecatedString(String key) { 2065 return "The variable " + key + " is no longer used."; 2066 } 2067 2068 private void checkAndWarnDeprecation() { 2069 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) { 2070 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) 2071 + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY 2072 + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY); 2073 } 2074 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) { 2075 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT)); 2076 } 2077 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) { 2078 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT)); 2079 } 2080 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) { 2081 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT)); 2082 } 2083 } 2084 2085 2086 } 2087