001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 022 import java.io.IOException; 023 import java.util.regex.Pattern; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceAudience.Private; 029 import org.apache.hadoop.classification.InterfaceStability; 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.fs.FileStatus; 032 import org.apache.hadoop.fs.FileSystem; 033 import org.apache.hadoop.fs.Path; 034 import org.apache.hadoop.io.LongWritable; 035 import org.apache.hadoop.io.RawComparator; 036 import org.apache.hadoop.io.Text; 037 import org.apache.hadoop.io.WritableComparable; 038 import org.apache.hadoop.io.WritableComparator; 039 import org.apache.hadoop.io.compress.CompressionCodec; 040 import org.apache.hadoop.mapred.lib.HashPartitioner; 041 import org.apache.hadoop.mapred.lib.IdentityMapper; 042 import org.apache.hadoop.mapred.lib.IdentityReducer; 043 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator; 044 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner; 045 import org.apache.hadoop.mapreduce.MRConfig; 046 import org.apache.hadoop.mapreduce.MRJobConfig; 047 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 048 import org.apache.hadoop.mapreduce.util.ConfigUtil; 049 import org.apache.hadoop.security.Credentials; 050 import org.apache.hadoop.util.ClassUtil; 051 import org.apache.hadoop.util.ReflectionUtils; 052 import org.apache.hadoop.util.Tool; 053 import org.apache.log4j.Level; 054 055 /** 056 * A map/reduce job configuration. 057 * 058 * <p><code>JobConf</code> is the primary interface for a user to describe a 059 * map-reduce job to the Hadoop framework for execution. The framework tries to 060 * faithfully execute the job as-is described by <code>JobConf</code>, however: 061 * <ol> 062 * <li> 063 * Some configuration parameters might have been marked as 064 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams"> 065 * final</a> by administrators and hence cannot be altered. 066 * </li> 067 * <li> 068 * While some job parameters are straight-forward to set 069 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 070 * rest of the framework and/or job-configuration and is relatively more 071 * complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}). 072 * </li> 073 * </ol></p> 074 * 075 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 076 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 077 * {@link OutputFormat} implementations to be used etc. 078 * 079 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 080 * of the job such as <code>Comparator</code>s to be used, files to be put in 081 * the {@link DistributedCache}, whether or not intermediate and/or job outputs 082 * are to be compressed (and how), debugability via user-provided scripts 083 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}), 084 * for doing post-processing on task logs, task's stdout, stderr, syslog. 085 * and etc.</p> 086 * 087 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p> 088 * <p><blockquote><pre> 089 * // Create a new JobConf 090 * JobConf job = new JobConf(new Configuration(), MyJob.class); 091 * 092 * // Specify various job-specific parameters 093 * job.setJobName("myjob"); 094 * 095 * FileInputFormat.setInputPaths(job, new Path("in")); 096 * FileOutputFormat.setOutputPath(job, new Path("out")); 097 * 098 * job.setMapperClass(MyJob.MyMapper.class); 099 * job.setCombinerClass(MyJob.MyReducer.class); 100 * job.setReducerClass(MyJob.MyReducer.class); 101 * 102 * job.setInputFormat(SequenceFileInputFormat.class); 103 * job.setOutputFormat(SequenceFileOutputFormat.class); 104 * </pre></blockquote></p> 105 * 106 * @see JobClient 107 * @see ClusterStatus 108 * @see Tool 109 * @see DistributedCache 110 */ 111 @InterfaceAudience.Public 112 @InterfaceStability.Stable 113 public class JobConf extends Configuration { 114 115 private static final Log LOG = LogFactory.getLog(JobConf.class); 116 117 static{ 118 ConfigUtil.loadResources(); 119 } 120 121 /** 122 * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and 123 * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY} 124 */ 125 @Deprecated 126 public static final String MAPRED_TASK_MAXVMEM_PROPERTY = 127 "mapred.task.maxvmem"; 128 129 /** 130 * @deprecated 131 */ 132 @Deprecated 133 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY = 134 "mapred.task.limit.maxvmem"; 135 136 /** 137 * @deprecated 138 */ 139 @Deprecated 140 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY = 141 "mapred.task.default.maxvmem"; 142 143 /** 144 * @deprecated 145 */ 146 @Deprecated 147 public static final String MAPRED_TASK_MAXPMEM_PROPERTY = 148 "mapred.task.maxpmem"; 149 150 /** 151 * A value which if set for memory related configuration options, 152 * indicates that the options are turned off. 153 */ 154 public static final long DISABLED_MEMORY_LIMIT = -1L; 155 156 /** 157 * Property name for the configuration property mapreduce.cluster.local.dir 158 */ 159 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR; 160 161 /** 162 * Name of the queue to which jobs will be submitted, if no queue 163 * name is mentioned. 164 */ 165 public static final String DEFAULT_QUEUE_NAME = "default"; 166 167 static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY = 168 JobContext.MAP_MEMORY_MB; 169 170 static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY = 171 JobContext.REDUCE_MEMORY_MB; 172 173 /** 174 * The variable is kept for M/R 1.x applications, while M/R 2.x applications 175 * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} 176 */ 177 @Deprecated 178 public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY = 179 "mapred.job.map.memory.mb"; 180 181 /** 182 * The variable is kept for M/R 1.x applications, while M/R 2.x applications 183 * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY} 184 */ 185 @Deprecated 186 public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY = 187 "mapred.job.reduce.memory.mb"; 188 189 /** Pattern for the default unpacking behavior for job jars */ 190 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT = 191 Pattern.compile("(?:classes/|lib/).*"); 192 193 /** 194 * Configuration key to set the java command line options for the child 195 * map and reduce tasks. 196 * 197 * Java opts for the task tracker child processes. 198 * The following symbol, if present, will be interpolated: @taskid@. 199 * It is replaced by current TaskID. Any other occurrences of '@' will go 200 * unchanged. 201 * For example, to enable verbose gc logging to a file named for the taskid in 202 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 203 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 204 * 205 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 206 * other environment variables to the child processes. 207 * 208 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 209 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS} 210 */ 211 @Deprecated 212 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts"; 213 214 /** 215 * Configuration key to set the java command line options for the map tasks. 216 * 217 * Java opts for the task tracker child map processes. 218 * The following symbol, if present, will be interpolated: @taskid@. 219 * It is replaced by current TaskID. Any other occurrences of '@' will go 220 * unchanged. 221 * For example, to enable verbose gc logging to a file named for the taskid in 222 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 223 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 224 * 225 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 226 * other environment variables to the map processes. 227 */ 228 public static final String MAPRED_MAP_TASK_JAVA_OPTS = 229 JobContext.MAP_JAVA_OPTS; 230 231 /** 232 * Configuration key to set the java command line options for the reduce tasks. 233 * 234 * Java opts for the task tracker child reduce processes. 235 * The following symbol, if present, will be interpolated: @taskid@. 236 * It is replaced by current TaskID. Any other occurrences of '@' will go 237 * unchanged. 238 * For example, to enable verbose gc logging to a file named for the taskid in 239 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 240 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 241 * 242 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 243 * pass process environment variables to the reduce processes. 244 */ 245 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 246 JobContext.REDUCE_JAVA_OPTS; 247 248 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m"; 249 250 /** 251 * @deprecated 252 * Configuration key to set the maximum virtual memory available to the child 253 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no 254 * longer have any effect. 255 */ 256 @Deprecated 257 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit"; 258 259 /** 260 * @deprecated 261 * Configuration key to set the maximum virtual memory available to the 262 * map tasks (in kilo-bytes). This has been deprecated and will no 263 * longer have any effect. 264 */ 265 @Deprecated 266 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit"; 267 268 /** 269 * @deprecated 270 * Configuration key to set the maximum virtual memory available to the 271 * reduce tasks (in kilo-bytes). This has been deprecated and will no 272 * longer have any effect. 273 */ 274 @Deprecated 275 public static final String MAPRED_REDUCE_TASK_ULIMIT = 276 "mapreduce.reduce.ulimit"; 277 278 279 /** 280 * Configuration key to set the environment of the child map/reduce tasks. 281 * 282 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 283 * reference existing environment variables via <code>$key</code> on 284 * Linux or <code>%key%</code> on Windows. 285 * 286 * Example: 287 * <ul> 288 * <li> A=foo - This will set the env variable A to foo. </li> 289 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 290 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 291 * </ul> 292 * 293 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 294 * {@link #MAPRED_REDUCE_TASK_ENV} 295 */ 296 @Deprecated 297 public static final String MAPRED_TASK_ENV = "mapred.child.env"; 298 299 /** 300 * Configuration key to set the environment of the child map tasks. 301 * 302 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 303 * reference existing environment variables via <code>$key</code> on 304 * Linux or <code>%key%</code> on Windows. 305 * 306 * Example: 307 * <ul> 308 * <li> A=foo - This will set the env variable A to foo. </li> 309 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 310 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 311 * </ul> 312 */ 313 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV; 314 315 /** 316 * Configuration key to set the environment of the child reduce tasks. 317 * 318 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 319 * reference existing environment variables via <code>$key</code> on 320 * Linux or <code>%key%</code> on Windows. 321 * 322 * Example: 323 * <ul> 324 * <li> A=foo - This will set the env variable A to foo. </li> 325 * <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li> 326 * <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li> 327 * </ul> 328 */ 329 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV; 330 331 private Credentials credentials = new Credentials(); 332 333 /** 334 * Configuration key to set the logging {@link Level} for the map task. 335 * 336 * The allowed logging levels are: 337 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 338 */ 339 public static final String MAPRED_MAP_TASK_LOG_LEVEL = 340 JobContext.MAP_LOG_LEVEL; 341 342 /** 343 * Configuration key to set the logging {@link Level} for the reduce task. 344 * 345 * The allowed logging levels are: 346 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 347 */ 348 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 349 JobContext.REDUCE_LOG_LEVEL; 350 351 /** 352 * Default logging level for map/reduce tasks. 353 */ 354 public static final Level DEFAULT_LOG_LEVEL = Level.INFO; 355 356 /** 357 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 358 * use {@link MRJobConfig#WORKFLOW_ID} instead 359 */ 360 @Deprecated 361 public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID; 362 363 /** 364 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 365 * use {@link MRJobConfig#WORKFLOW_NAME} instead 366 */ 367 @Deprecated 368 public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME; 369 370 /** 371 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 372 * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead 373 */ 374 @Deprecated 375 public static final String WORKFLOW_NODE_NAME = 376 MRJobConfig.WORKFLOW_NODE_NAME; 377 378 /** 379 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 380 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead 381 */ 382 @Deprecated 383 public static final String WORKFLOW_ADJACENCY_PREFIX_STRING = 384 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING; 385 386 /** 387 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 388 * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead 389 */ 390 @Deprecated 391 public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN = 392 MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN; 393 394 /** 395 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 396 * use {@link MRJobConfig#WORKFLOW_TAGS} instead 397 */ 398 @Deprecated 399 public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS; 400 401 /** 402 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 403 * not use it 404 */ 405 @Deprecated 406 public static final String MAPREDUCE_RECOVER_JOB = 407 "mapreduce.job.restart.recover"; 408 409 /** 410 * The variable is kept for M/R 1.x applications, M/R 2.x applications should 411 * not use it 412 */ 413 @Deprecated 414 public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true; 415 416 /** 417 * Construct a map/reduce job configuration. 418 */ 419 public JobConf() { 420 checkAndWarnDeprecation(); 421 } 422 423 /** 424 * Construct a map/reduce job configuration. 425 * 426 * @param exampleClass a class whose containing jar is used as the job's jar. 427 */ 428 public JobConf(Class exampleClass) { 429 setJarByClass(exampleClass); 430 checkAndWarnDeprecation(); 431 } 432 433 /** 434 * Construct a map/reduce job configuration. 435 * 436 * @param conf a Configuration whose settings will be inherited. 437 */ 438 public JobConf(Configuration conf) { 439 super(conf); 440 441 if (conf instanceof JobConf) { 442 JobConf that = (JobConf)conf; 443 credentials = that.credentials; 444 } 445 446 checkAndWarnDeprecation(); 447 } 448 449 450 /** Construct a map/reduce job configuration. 451 * 452 * @param conf a Configuration whose settings will be inherited. 453 * @param exampleClass a class whose containing jar is used as the job's jar. 454 */ 455 public JobConf(Configuration conf, Class exampleClass) { 456 this(conf); 457 setJarByClass(exampleClass); 458 } 459 460 461 /** Construct a map/reduce configuration. 462 * 463 * @param config a Configuration-format XML job description file. 464 */ 465 public JobConf(String config) { 466 this(new Path(config)); 467 } 468 469 /** Construct a map/reduce configuration. 470 * 471 * @param config a Configuration-format XML job description file. 472 */ 473 public JobConf(Path config) { 474 super(); 475 addResource(config); 476 checkAndWarnDeprecation(); 477 } 478 479 /** A new map/reduce configuration where the behavior of reading from the 480 * default resources can be turned off. 481 * <p/> 482 * If the parameter {@code loadDefaults} is false, the new instance 483 * will not load resources from the default files. 484 * 485 * @param loadDefaults specifies whether to load from the default files 486 */ 487 public JobConf(boolean loadDefaults) { 488 super(loadDefaults); 489 checkAndWarnDeprecation(); 490 } 491 492 /** 493 * Get credentials for the job. 494 * @return credentials for the job 495 */ 496 public Credentials getCredentials() { 497 return credentials; 498 } 499 500 @Private 501 public void setCredentials(Credentials credentials) { 502 this.credentials = credentials; 503 } 504 505 /** 506 * Get the user jar for the map-reduce job. 507 * 508 * @return the user jar for the map-reduce job. 509 */ 510 public String getJar() { return get(JobContext.JAR); } 511 512 /** 513 * Set the user jar for the map-reduce job. 514 * 515 * @param jar the user jar for the map-reduce job. 516 */ 517 public void setJar(String jar) { set(JobContext.JAR, jar); } 518 519 /** 520 * Get the pattern for jar contents to unpack on the tasktracker 521 */ 522 public Pattern getJarUnpackPattern() { 523 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT); 524 } 525 526 527 /** 528 * Set the job's jar file by finding an example class location. 529 * 530 * @param cls the example class. 531 */ 532 public void setJarByClass(Class cls) { 533 String jar = ClassUtil.findContainingJar(cls); 534 if (jar != null) { 535 setJar(jar); 536 } 537 } 538 539 public String[] getLocalDirs() throws IOException { 540 return getTrimmedStrings(MRConfig.LOCAL_DIR); 541 } 542 543 /** 544 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead. 545 */ 546 @Deprecated 547 public void deleteLocalFiles() throws IOException { 548 String[] localDirs = getLocalDirs(); 549 for (int i = 0; i < localDirs.length; i++) { 550 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true); 551 } 552 } 553 554 public void deleteLocalFiles(String subdir) throws IOException { 555 String[] localDirs = getLocalDirs(); 556 for (int i = 0; i < localDirs.length; i++) { 557 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true); 558 } 559 } 560 561 /** 562 * Constructs a local file name. Files are distributed among configured 563 * local directories. 564 */ 565 public Path getLocalPath(String pathString) throws IOException { 566 return getLocalPath(MRConfig.LOCAL_DIR, pathString); 567 } 568 569 /** 570 * Get the reported username for this job. 571 * 572 * @return the username 573 */ 574 public String getUser() { 575 return get(JobContext.USER_NAME); 576 } 577 578 /** 579 * Set the reported username for this job. 580 * 581 * @param user the username for this job. 582 */ 583 public void setUser(String user) { 584 set(JobContext.USER_NAME, user); 585 } 586 587 588 589 /** 590 * Set whether the framework should keep the intermediate files for 591 * failed tasks. 592 * 593 * @param keep <code>true</code> if framework should keep the intermediate files 594 * for failed tasks, <code>false</code> otherwise. 595 * 596 */ 597 public void setKeepFailedTaskFiles(boolean keep) { 598 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep); 599 } 600 601 /** 602 * Should the temporary files for failed tasks be kept? 603 * 604 * @return should the files be kept? 605 */ 606 public boolean getKeepFailedTaskFiles() { 607 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false); 608 } 609 610 /** 611 * Set a regular expression for task names that should be kept. 612 * The regular expression ".*_m_000123_0" would keep the files 613 * for the first instance of map 123 that ran. 614 * 615 * @param pattern the java.util.regex.Pattern to match against the 616 * task names. 617 */ 618 public void setKeepTaskFilesPattern(String pattern) { 619 set(JobContext.PRESERVE_FILES_PATTERN, pattern); 620 } 621 622 /** 623 * Get the regular expression that is matched against the task names 624 * to see if we need to keep the files. 625 * 626 * @return the pattern as a string, if it was set, othewise null. 627 */ 628 public String getKeepTaskFilesPattern() { 629 return get(JobContext.PRESERVE_FILES_PATTERN); 630 } 631 632 /** 633 * Set the current working directory for the default file system. 634 * 635 * @param dir the new current working directory. 636 */ 637 public void setWorkingDirectory(Path dir) { 638 dir = new Path(getWorkingDirectory(), dir); 639 set(JobContext.WORKING_DIR, dir.toString()); 640 } 641 642 /** 643 * Get the current working directory for the default file system. 644 * 645 * @return the directory name. 646 */ 647 public Path getWorkingDirectory() { 648 String name = get(JobContext.WORKING_DIR); 649 if (name != null) { 650 return new Path(name); 651 } else { 652 try { 653 Path dir = FileSystem.get(this).getWorkingDirectory(); 654 set(JobContext.WORKING_DIR, dir.toString()); 655 return dir; 656 } catch (IOException e) { 657 throw new RuntimeException(e); 658 } 659 } 660 } 661 662 /** 663 * Sets the number of tasks that a spawned task JVM should run 664 * before it exits 665 * @param numTasks the number of tasks to execute; defaults to 1; 666 * -1 signifies no limit 667 */ 668 public void setNumTasksToExecutePerJvm(int numTasks) { 669 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks); 670 } 671 672 /** 673 * Get the number of tasks that a spawned JVM should execute 674 */ 675 public int getNumTasksToExecutePerJvm() { 676 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1); 677 } 678 679 /** 680 * Get the {@link InputFormat} implementation for the map-reduce job, 681 * defaults to {@link TextInputFormat} if not specified explicity. 682 * 683 * @return the {@link InputFormat} implementation for the map-reduce job. 684 */ 685 public InputFormat getInputFormat() { 686 return ReflectionUtils.newInstance(getClass("mapred.input.format.class", 687 TextInputFormat.class, 688 InputFormat.class), 689 this); 690 } 691 692 /** 693 * Set the {@link InputFormat} implementation for the map-reduce job. 694 * 695 * @param theClass the {@link InputFormat} implementation for the map-reduce 696 * job. 697 */ 698 public void setInputFormat(Class<? extends InputFormat> theClass) { 699 setClass("mapred.input.format.class", theClass, InputFormat.class); 700 } 701 702 /** 703 * Get the {@link OutputFormat} implementation for the map-reduce job, 704 * defaults to {@link TextOutputFormat} if not specified explicity. 705 * 706 * @return the {@link OutputFormat} implementation for the map-reduce job. 707 */ 708 public OutputFormat getOutputFormat() { 709 return ReflectionUtils.newInstance(getClass("mapred.output.format.class", 710 TextOutputFormat.class, 711 OutputFormat.class), 712 this); 713 } 714 715 /** 716 * Get the {@link OutputCommitter} implementation for the map-reduce job, 717 * defaults to {@link FileOutputCommitter} if not specified explicitly. 718 * 719 * @return the {@link OutputCommitter} implementation for the map-reduce job. 720 */ 721 public OutputCommitter getOutputCommitter() { 722 return (OutputCommitter)ReflectionUtils.newInstance( 723 getClass("mapred.output.committer.class", FileOutputCommitter.class, 724 OutputCommitter.class), this); 725 } 726 727 /** 728 * Set the {@link OutputCommitter} implementation for the map-reduce job. 729 * 730 * @param theClass the {@link OutputCommitter} implementation for the map-reduce 731 * job. 732 */ 733 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) { 734 setClass("mapred.output.committer.class", theClass, OutputCommitter.class); 735 } 736 737 /** 738 * Set the {@link OutputFormat} implementation for the map-reduce job. 739 * 740 * @param theClass the {@link OutputFormat} implementation for the map-reduce 741 * job. 742 */ 743 public void setOutputFormat(Class<? extends OutputFormat> theClass) { 744 setClass("mapred.output.format.class", theClass, OutputFormat.class); 745 } 746 747 /** 748 * Should the map outputs be compressed before transfer? 749 * 750 * @param compress should the map outputs be compressed? 751 */ 752 public void setCompressMapOutput(boolean compress) { 753 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress); 754 } 755 756 /** 757 * Are the outputs of the maps be compressed? 758 * 759 * @return <code>true</code> if the outputs of the maps are to be compressed, 760 * <code>false</code> otherwise. 761 */ 762 public boolean getCompressMapOutput() { 763 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false); 764 } 765 766 /** 767 * Set the given class as the {@link CompressionCodec} for the map outputs. 768 * 769 * @param codecClass the {@link CompressionCodec} class that will compress 770 * the map outputs. 771 */ 772 public void 773 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) { 774 setCompressMapOutput(true); 775 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 776 CompressionCodec.class); 777 } 778 779 /** 780 * Get the {@link CompressionCodec} for compressing the map outputs. 781 * 782 * @param defaultValue the {@link CompressionCodec} to return if not set 783 * @return the {@link CompressionCodec} class that should be used to compress the 784 * map outputs. 785 * @throws IllegalArgumentException if the class was specified, but not found 786 */ 787 public Class<? extends CompressionCodec> 788 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) { 789 Class<? extends CompressionCodec> codecClass = defaultValue; 790 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC); 791 if (name != null) { 792 try { 793 codecClass = getClassByName(name).asSubclass(CompressionCodec.class); 794 } catch (ClassNotFoundException e) { 795 throw new IllegalArgumentException("Compression codec " + name + 796 " was not found.", e); 797 } 798 } 799 return codecClass; 800 } 801 802 /** 803 * Get the key class for the map output data. If it is not set, use the 804 * (final) output key class. This allows the map output key class to be 805 * different than the final output key class. 806 * 807 * @return the map output key class. 808 */ 809 public Class<?> getMapOutputKeyClass() { 810 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class); 811 if (retv == null) { 812 retv = getOutputKeyClass(); 813 } 814 return retv; 815 } 816 817 /** 818 * Set the key class for the map output data. This allows the user to 819 * specify the map output key class to be different than the final output 820 * value class. 821 * 822 * @param theClass the map output key class. 823 */ 824 public void setMapOutputKeyClass(Class<?> theClass) { 825 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class); 826 } 827 828 /** 829 * Get the value class for the map output data. If it is not set, use the 830 * (final) output value class This allows the map output value class to be 831 * different than the final output value class. 832 * 833 * @return the map output value class. 834 */ 835 public Class<?> getMapOutputValueClass() { 836 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null, 837 Object.class); 838 if (retv == null) { 839 retv = getOutputValueClass(); 840 } 841 return retv; 842 } 843 844 /** 845 * Set the value class for the map output data. This allows the user to 846 * specify the map output value class to be different than the final output 847 * value class. 848 * 849 * @param theClass the map output value class. 850 */ 851 public void setMapOutputValueClass(Class<?> theClass) { 852 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class); 853 } 854 855 /** 856 * Get the key class for the job output data. 857 * 858 * @return the key class for the job output data. 859 */ 860 public Class<?> getOutputKeyClass() { 861 return getClass(JobContext.OUTPUT_KEY_CLASS, 862 LongWritable.class, Object.class); 863 } 864 865 /** 866 * Set the key class for the job output data. 867 * 868 * @param theClass the key class for the job output data. 869 */ 870 public void setOutputKeyClass(Class<?> theClass) { 871 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class); 872 } 873 874 /** 875 * Get the {@link RawComparator} comparator used to compare keys. 876 * 877 * @return the {@link RawComparator} comparator used to compare keys. 878 */ 879 public RawComparator getOutputKeyComparator() { 880 Class<? extends RawComparator> theClass = getClass( 881 JobContext.KEY_COMPARATOR, null, RawComparator.class); 882 if (theClass != null) 883 return ReflectionUtils.newInstance(theClass, this); 884 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class)); 885 } 886 887 /** 888 * Set the {@link RawComparator} comparator used to compare keys. 889 * 890 * @param theClass the {@link RawComparator} comparator used to 891 * compare keys. 892 * @see #setOutputValueGroupingComparator(Class) 893 */ 894 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) { 895 setClass(JobContext.KEY_COMPARATOR, 896 theClass, RawComparator.class); 897 } 898 899 /** 900 * Set the {@link KeyFieldBasedComparator} options used to compare keys. 901 * 902 * @param keySpec the key specification of the form -k pos1[,pos2], where, 903 * pos is of the form f[.c][opts], where f is the number 904 * of the key field to use, and c is the number of the first character from 905 * the beginning of the field. Fields and character posns are numbered 906 * starting with 1; a character position of zero in pos2 indicates the 907 * field's last character. If '.c' is omitted from pos1, it defaults to 1 908 * (the beginning of the field); if omitted from pos2, it defaults to 0 909 * (the end of the field). opts are ordering options. The supported options 910 * are: 911 * -n, (Sort numerically) 912 * -r, (Reverse the result of comparison) 913 */ 914 public void setKeyFieldComparatorOptions(String keySpec) { 915 setOutputKeyComparatorClass(KeyFieldBasedComparator.class); 916 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec); 917 } 918 919 /** 920 * Get the {@link KeyFieldBasedComparator} options 921 */ 922 public String getKeyFieldComparatorOption() { 923 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS); 924 } 925 926 /** 927 * Set the {@link KeyFieldBasedPartitioner} options used for 928 * {@link Partitioner} 929 * 930 * @param keySpec the key specification of the form -k pos1[,pos2], where, 931 * pos is of the form f[.c][opts], where f is the number 932 * of the key field to use, and c is the number of the first character from 933 * the beginning of the field. Fields and character posns are numbered 934 * starting with 1; a character position of zero in pos2 indicates the 935 * field's last character. If '.c' is omitted from pos1, it defaults to 1 936 * (the beginning of the field); if omitted from pos2, it defaults to 0 937 * (the end of the field). 938 */ 939 public void setKeyFieldPartitionerOptions(String keySpec) { 940 setPartitionerClass(KeyFieldBasedPartitioner.class); 941 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec); 942 } 943 944 /** 945 * Get the {@link KeyFieldBasedPartitioner} options 946 */ 947 public String getKeyFieldPartitionerOption() { 948 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS); 949 } 950 951 /** 952 * Get the user defined {@link WritableComparable} comparator for 953 * grouping keys of inputs to the combiner. 954 * 955 * @return comparator set by the user for grouping values. 956 * @see #setCombinerKeyGroupingComparator(Class) for details. 957 */ 958 public RawComparator getCombinerKeyGroupingComparator() { 959 Class<? extends RawComparator> theClass = getClass( 960 JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class); 961 if (theClass == null) { 962 return getOutputKeyComparator(); 963 } 964 965 return ReflectionUtils.newInstance(theClass, this); 966 } 967 968 /** 969 * Get the user defined {@link WritableComparable} comparator for 970 * grouping keys of inputs to the reduce. 971 * 972 * @return comparator set by the user for grouping values. 973 * @see #setOutputValueGroupingComparator(Class) for details. 974 */ 975 public RawComparator getOutputValueGroupingComparator() { 976 Class<? extends RawComparator> theClass = getClass( 977 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class); 978 if (theClass == null) { 979 return getOutputKeyComparator(); 980 } 981 982 return ReflectionUtils.newInstance(theClass, this); 983 } 984 985 /** 986 * Set the user defined {@link RawComparator} comparator for 987 * grouping keys in the input to the combiner. 988 * <p/> 989 * <p>This comparator should be provided if the equivalence rules for keys 990 * for sorting the intermediates are different from those for grouping keys 991 * before each call to 992 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 993 * <p/> 994 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 995 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 996 * <p/> 997 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 998 * how keys are sorted, this can be used in conjunction to simulate 999 * <i>secondary sort on values</i>.</p> 1000 * <p/> 1001 * <p><i>Note</i>: This is not a guarantee of the combiner sort being 1002 * <i>stable</i> in any sense. (In any case, with the order of available 1003 * map-outputs to the combiner being non-deterministic, it wouldn't make 1004 * that much sense.)</p> 1005 * 1006 * @param theClass the comparator class to be used for grouping keys for the 1007 * combiner. It should implement <code>RawComparator</code>. 1008 * @see #setOutputKeyComparatorClass(Class) 1009 */ 1010 public void setCombinerKeyGroupingComparator( 1011 Class<? extends RawComparator> theClass) { 1012 setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS, 1013 theClass, RawComparator.class); 1014 } 1015 1016 /** 1017 * Set the user defined {@link RawComparator} comparator for 1018 * grouping keys in the input to the reduce. 1019 * 1020 * <p>This comparator should be provided if the equivalence rules for keys 1021 * for sorting the intermediates are different from those for grouping keys 1022 * before each call to 1023 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 1024 * 1025 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 1026 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 1027 * 1028 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 1029 * how keys are sorted, this can be used in conjunction to simulate 1030 * <i>secondary sort on values</i>.</p> 1031 * 1032 * <p><i>Note</i>: This is not a guarantee of the reduce sort being 1033 * <i>stable</i> in any sense. (In any case, with the order of available 1034 * map-outputs to the reduce being non-deterministic, it wouldn't make 1035 * that much sense.)</p> 1036 * 1037 * @param theClass the comparator class to be used for grouping keys. 1038 * It should implement <code>RawComparator</code>. 1039 * @see #setOutputKeyComparatorClass(Class) 1040 * @see #setCombinerKeyGroupingComparator(Class) 1041 */ 1042 public void setOutputValueGroupingComparator( 1043 Class<? extends RawComparator> theClass) { 1044 setClass(JobContext.GROUP_COMPARATOR_CLASS, 1045 theClass, RawComparator.class); 1046 } 1047 1048 /** 1049 * Should the framework use the new context-object code for running 1050 * the mapper? 1051 * @return true, if the new api should be used 1052 */ 1053 public boolean getUseNewMapper() { 1054 return getBoolean("mapred.mapper.new-api", false); 1055 } 1056 /** 1057 * Set whether the framework should use the new api for the mapper. 1058 * This is the default for jobs submitted with the new Job api. 1059 * @param flag true, if the new api should be used 1060 */ 1061 public void setUseNewMapper(boolean flag) { 1062 setBoolean("mapred.mapper.new-api", flag); 1063 } 1064 1065 /** 1066 * Should the framework use the new context-object code for running 1067 * the reducer? 1068 * @return true, if the new api should be used 1069 */ 1070 public boolean getUseNewReducer() { 1071 return getBoolean("mapred.reducer.new-api", false); 1072 } 1073 /** 1074 * Set whether the framework should use the new api for the reducer. 1075 * This is the default for jobs submitted with the new Job api. 1076 * @param flag true, if the new api should be used 1077 */ 1078 public void setUseNewReducer(boolean flag) { 1079 setBoolean("mapred.reducer.new-api", flag); 1080 } 1081 1082 /** 1083 * Get the value class for job outputs. 1084 * 1085 * @return the value class for job outputs. 1086 */ 1087 public Class<?> getOutputValueClass() { 1088 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class); 1089 } 1090 1091 /** 1092 * Set the value class for job outputs. 1093 * 1094 * @param theClass the value class for job outputs. 1095 */ 1096 public void setOutputValueClass(Class<?> theClass) { 1097 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class); 1098 } 1099 1100 /** 1101 * Get the {@link Mapper} class for the job. 1102 * 1103 * @return the {@link Mapper} class for the job. 1104 */ 1105 public Class<? extends Mapper> getMapperClass() { 1106 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class); 1107 } 1108 1109 /** 1110 * Set the {@link Mapper} class for the job. 1111 * 1112 * @param theClass the {@link Mapper} class for the job. 1113 */ 1114 public void setMapperClass(Class<? extends Mapper> theClass) { 1115 setClass("mapred.mapper.class", theClass, Mapper.class); 1116 } 1117 1118 /** 1119 * Get the {@link MapRunnable} class for the job. 1120 * 1121 * @return the {@link MapRunnable} class for the job. 1122 */ 1123 public Class<? extends MapRunnable> getMapRunnerClass() { 1124 return getClass("mapred.map.runner.class", 1125 MapRunner.class, MapRunnable.class); 1126 } 1127 1128 /** 1129 * Expert: Set the {@link MapRunnable} class for the job. 1130 * 1131 * Typically used to exert greater control on {@link Mapper}s. 1132 * 1133 * @param theClass the {@link MapRunnable} class for the job. 1134 */ 1135 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) { 1136 setClass("mapred.map.runner.class", theClass, MapRunnable.class); 1137 } 1138 1139 /** 1140 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 1141 * to be sent to the {@link Reducer}s. 1142 * 1143 * @return the {@link Partitioner} used to partition map-outputs. 1144 */ 1145 public Class<? extends Partitioner> getPartitionerClass() { 1146 return getClass("mapred.partitioner.class", 1147 HashPartitioner.class, Partitioner.class); 1148 } 1149 1150 /** 1151 * Set the {@link Partitioner} class used to partition 1152 * {@link Mapper}-outputs to be sent to the {@link Reducer}s. 1153 * 1154 * @param theClass the {@link Partitioner} used to partition map-outputs. 1155 */ 1156 public void setPartitionerClass(Class<? extends Partitioner> theClass) { 1157 setClass("mapred.partitioner.class", theClass, Partitioner.class); 1158 } 1159 1160 /** 1161 * Get the {@link Reducer} class for the job. 1162 * 1163 * @return the {@link Reducer} class for the job. 1164 */ 1165 public Class<? extends Reducer> getReducerClass() { 1166 return getClass("mapred.reducer.class", 1167 IdentityReducer.class, Reducer.class); 1168 } 1169 1170 /** 1171 * Set the {@link Reducer} class for the job. 1172 * 1173 * @param theClass the {@link Reducer} class for the job. 1174 */ 1175 public void setReducerClass(Class<? extends Reducer> theClass) { 1176 setClass("mapred.reducer.class", theClass, Reducer.class); 1177 } 1178 1179 /** 1180 * Get the user-defined <i>combiner</i> class used to combine map-outputs 1181 * before being sent to the reducers. Typically the combiner is same as the 1182 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}. 1183 * 1184 * @return the user-defined combiner class used to combine map-outputs. 1185 */ 1186 public Class<? extends Reducer> getCombinerClass() { 1187 return getClass("mapred.combiner.class", null, Reducer.class); 1188 } 1189 1190 /** 1191 * Set the user-defined <i>combiner</i> class used to combine map-outputs 1192 * before being sent to the reducers. 1193 * 1194 * <p>The combiner is an application-specified aggregation operation, which 1195 * can help cut down the amount of data transferred between the 1196 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p> 1197 * 1198 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both 1199 * the mapper and reducer tasks. In general, the combiner is called as the 1200 * sort/merge result is written to disk. The combiner must: 1201 * <ul> 1202 * <li> be side-effect free</li> 1203 * <li> have the same input and output key types and the same input and 1204 * output value types</li> 1205 * </ul></p> 1206 * 1207 * <p>Typically the combiner is same as the <code>Reducer</code> for the 1208 * job i.e. {@link #setReducerClass(Class)}.</p> 1209 * 1210 * @param theClass the user-defined combiner class used to combine 1211 * map-outputs. 1212 */ 1213 public void setCombinerClass(Class<? extends Reducer> theClass) { 1214 setClass("mapred.combiner.class", theClass, Reducer.class); 1215 } 1216 1217 /** 1218 * Should speculative execution be used for this job? 1219 * Defaults to <code>true</code>. 1220 * 1221 * @return <code>true</code> if speculative execution be used for this job, 1222 * <code>false</code> otherwise. 1223 */ 1224 public boolean getSpeculativeExecution() { 1225 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution()); 1226 } 1227 1228 /** 1229 * Turn speculative execution on or off for this job. 1230 * 1231 * @param speculativeExecution <code>true</code> if speculative execution 1232 * should be turned on, else <code>false</code>. 1233 */ 1234 public void setSpeculativeExecution(boolean speculativeExecution) { 1235 setMapSpeculativeExecution(speculativeExecution); 1236 setReduceSpeculativeExecution(speculativeExecution); 1237 } 1238 1239 /** 1240 * Should speculative execution be used for this job for map tasks? 1241 * Defaults to <code>true</code>. 1242 * 1243 * @return <code>true</code> if speculative execution be 1244 * used for this job for map tasks, 1245 * <code>false</code> otherwise. 1246 */ 1247 public boolean getMapSpeculativeExecution() { 1248 return getBoolean(JobContext.MAP_SPECULATIVE, true); 1249 } 1250 1251 /** 1252 * Turn speculative execution on or off for this job for map tasks. 1253 * 1254 * @param speculativeExecution <code>true</code> if speculative execution 1255 * should be turned on for map tasks, 1256 * else <code>false</code>. 1257 */ 1258 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1259 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution); 1260 } 1261 1262 /** 1263 * Should speculative execution be used for this job for reduce tasks? 1264 * Defaults to <code>true</code>. 1265 * 1266 * @return <code>true</code> if speculative execution be used 1267 * for reduce tasks for this job, 1268 * <code>false</code> otherwise. 1269 */ 1270 public boolean getReduceSpeculativeExecution() { 1271 return getBoolean(JobContext.REDUCE_SPECULATIVE, true); 1272 } 1273 1274 /** 1275 * Turn speculative execution on or off for this job for reduce tasks. 1276 * 1277 * @param speculativeExecution <code>true</code> if speculative execution 1278 * should be turned on for reduce tasks, 1279 * else <code>false</code>. 1280 */ 1281 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1282 setBoolean(JobContext.REDUCE_SPECULATIVE, 1283 speculativeExecution); 1284 } 1285 1286 /** 1287 * Get configured the number of reduce tasks for this job. 1288 * Defaults to <code>1</code>. 1289 * 1290 * @return the number of reduce tasks for this job. 1291 */ 1292 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); } 1293 1294 /** 1295 * Set the number of map tasks for this job. 1296 * 1297 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 1298 * number of spawned map tasks depends on the number of {@link InputSplit}s 1299 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}. 1300 * 1301 * A custom {@link InputFormat} is typically used to accurately control 1302 * the number of map tasks for the job.</p> 1303 * 1304 * <h4 id="NoOfMaps">How many maps?</h4> 1305 * 1306 * <p>The number of maps is usually driven by the total size of the inputs 1307 * i.e. total number of blocks of the input files.</p> 1308 * 1309 * <p>The right level of parallelism for maps seems to be around 10-100 maps 1310 * per-node, although it has been set up to 300 or so for very cpu-light map 1311 * tasks. Task setup takes awhile, so it is best if the maps take at least a 1312 * minute to execute.</p> 1313 * 1314 * <p>The default behavior of file-based {@link InputFormat}s is to split the 1315 * input into <i>logical</i> {@link InputSplit}s based on the total size, in 1316 * bytes, of input files. However, the {@link FileSystem} blocksize of the 1317 * input files is treated as an upper bound for input splits. A lower bound 1318 * on the split size can be set via 1319 * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize"> 1320 * mapreduce.input.fileinputformat.split.minsize</a>.</p> 1321 * 1322 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 1323 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 1324 * used to set it even higher.</p> 1325 * 1326 * @param n the number of map tasks for this job. 1327 * @see InputFormat#getSplits(JobConf, int) 1328 * @see FileInputFormat 1329 * @see FileSystem#getDefaultBlockSize() 1330 * @see FileStatus#getBlockSize() 1331 */ 1332 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); } 1333 1334 /** 1335 * Get configured the number of reduce tasks for this job. Defaults to 1336 * <code>1</code>. 1337 * 1338 * @return the number of reduce tasks for this job. 1339 */ 1340 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); } 1341 1342 /** 1343 * Set the requisite number of reduce tasks for this job. 1344 * 1345 * <h4 id="NoOfReduces">How many reduces?</h4> 1346 * 1347 * <p>The right number of reduces seems to be <code>0.95</code> or 1348 * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> * 1349 * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum"> 1350 * mapreduce.tasktracker.reduce.tasks.maximum</a>). 1351 * </p> 1352 * 1353 * <p>With <code>0.95</code> all of the reduces can launch immediately and 1354 * start transfering map outputs as the maps finish. With <code>1.75</code> 1355 * the faster nodes will finish their first round of reduces and launch a 1356 * second wave of reduces doing a much better job of load balancing.</p> 1357 * 1358 * <p>Increasing the number of reduces increases the framework overhead, but 1359 * increases load balancing and lowers the cost of failures.</p> 1360 * 1361 * <p>The scaling factors above are slightly less than whole numbers to 1362 * reserve a few reduce slots in the framework for speculative-tasks, failures 1363 * etc.</p> 1364 * 1365 * <h4 id="ReducerNone">Reducer NONE</h4> 1366 * 1367 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p> 1368 * 1369 * <p>In this case the output of the map-tasks directly go to distributed 1370 * file-system, to the path set by 1371 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 1372 * framework doesn't sort the map-outputs before writing it out to HDFS.</p> 1373 * 1374 * @param n the number of reduce tasks for this job. 1375 */ 1376 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); } 1377 1378 /** 1379 * Get the configured number of maximum attempts that will be made to run a 1380 * map task, as specified by the <code>mapreduce.map.maxattempts</code> 1381 * property. If this property is not already set, the default is 4 attempts. 1382 * 1383 * @return the max number of attempts per map task. 1384 */ 1385 public int getMaxMapAttempts() { 1386 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4); 1387 } 1388 1389 /** 1390 * Expert: Set the number of maximum attempts that will be made to run a 1391 * map task. 1392 * 1393 * @param n the number of attempts per map task. 1394 */ 1395 public void setMaxMapAttempts(int n) { 1396 setInt(JobContext.MAP_MAX_ATTEMPTS, n); 1397 } 1398 1399 /** 1400 * Get the configured number of maximum attempts that will be made to run a 1401 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code> 1402 * property. If this property is not already set, the default is 4 attempts. 1403 * 1404 * @return the max number of attempts per reduce task. 1405 */ 1406 public int getMaxReduceAttempts() { 1407 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4); 1408 } 1409 /** 1410 * Expert: Set the number of maximum attempts that will be made to run a 1411 * reduce task. 1412 * 1413 * @param n the number of attempts per reduce task. 1414 */ 1415 public void setMaxReduceAttempts(int n) { 1416 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n); 1417 } 1418 1419 /** 1420 * Get the user-specified job name. This is only used to identify the 1421 * job to the user. 1422 * 1423 * @return the job's name, defaulting to "". 1424 */ 1425 public String getJobName() { 1426 return get(JobContext.JOB_NAME, ""); 1427 } 1428 1429 /** 1430 * Set the user-specified job name. 1431 * 1432 * @param name the job's new name. 1433 */ 1434 public void setJobName(String name) { 1435 set(JobContext.JOB_NAME, name); 1436 } 1437 1438 /** 1439 * Get the user-specified session identifier. The default is the empty string. 1440 * 1441 * The session identifier is used to tag metric data that is reported to some 1442 * performance metrics system via the org.apache.hadoop.metrics API. The 1443 * session identifier is intended, in particular, for use by Hadoop-On-Demand 1444 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 1445 * HOD will set the session identifier by modifying the mapred-site.xml file 1446 * before starting the cluster. 1447 * 1448 * When not running under HOD, this identifer is expected to remain set to 1449 * the empty string. 1450 * 1451 * @return the session identifier, defaulting to "". 1452 */ 1453 @Deprecated 1454 public String getSessionId() { 1455 return get("session.id", ""); 1456 } 1457 1458 /** 1459 * Set the user-specified session identifier. 1460 * 1461 * @param sessionId the new session id. 1462 */ 1463 @Deprecated 1464 public void setSessionId(String sessionId) { 1465 set("session.id", sessionId); 1466 } 1467 1468 /** 1469 * Set the maximum no. of failures of a given job per tasktracker. 1470 * If the no. of task failures exceeds <code>noFailures</code>, the 1471 * tasktracker is <i>blacklisted</i> for this job. 1472 * 1473 * @param noFailures maximum no. of failures of a given job per tasktracker. 1474 */ 1475 public void setMaxTaskFailuresPerTracker(int noFailures) { 1476 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures); 1477 } 1478 1479 /** 1480 * Expert: Get the maximum no. of failures of a given job per tasktracker. 1481 * If the no. of task failures exceeds this, the tasktracker is 1482 * <i>blacklisted</i> for this job. 1483 * 1484 * @return the maximum no. of failures of a given job per tasktracker. 1485 */ 1486 public int getMaxTaskFailuresPerTracker() { 1487 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3); 1488 } 1489 1490 /** 1491 * Get the maximum percentage of map tasks that can fail without 1492 * the job being aborted. 1493 * 1494 * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 1495 * attempts before being declared as <i>failed</i>. 1496 * 1497 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in 1498 * the job being declared as {@link JobStatus#FAILED}. 1499 * 1500 * @return the maximum percentage of map tasks that can fail without 1501 * the job being aborted. 1502 */ 1503 public int getMaxMapTaskFailuresPercent() { 1504 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0); 1505 } 1506 1507 /** 1508 * Expert: Set the maximum percentage of map tasks that can fail without the 1509 * job being aborted. 1510 * 1511 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 1512 * before being declared as <i>failed</i>. 1513 * 1514 * @param percent the maximum percentage of map tasks that can fail without 1515 * the job being aborted. 1516 */ 1517 public void setMaxMapTaskFailuresPercent(int percent) { 1518 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent); 1519 } 1520 1521 /** 1522 * Get the maximum percentage of reduce tasks that can fail without 1523 * the job being aborted. 1524 * 1525 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1526 * attempts before being declared as <i>failed</i>. 1527 * 1528 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 1529 * in the job being declared as {@link JobStatus#FAILED}. 1530 * 1531 * @return the maximum percentage of reduce tasks that can fail without 1532 * the job being aborted. 1533 */ 1534 public int getMaxReduceTaskFailuresPercent() { 1535 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0); 1536 } 1537 1538 /** 1539 * Set the maximum percentage of reduce tasks that can fail without the job 1540 * being aborted. 1541 * 1542 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1543 * attempts before being declared as <i>failed</i>. 1544 * 1545 * @param percent the maximum percentage of reduce tasks that can fail without 1546 * the job being aborted. 1547 */ 1548 public void setMaxReduceTaskFailuresPercent(int percent) { 1549 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent); 1550 } 1551 1552 /** 1553 * Set {@link JobPriority} for this job. 1554 * 1555 * @param prio the {@link JobPriority} for this job. 1556 */ 1557 public void setJobPriority(JobPriority prio) { 1558 set(JobContext.PRIORITY, prio.toString()); 1559 } 1560 1561 /** 1562 * Get the {@link JobPriority} for this job. 1563 * 1564 * @return the {@link JobPriority} for this job. 1565 */ 1566 public JobPriority getJobPriority() { 1567 String prio = get(JobContext.PRIORITY); 1568 if(prio == null) { 1569 return JobPriority.NORMAL; 1570 } 1571 1572 return JobPriority.valueOf(prio); 1573 } 1574 1575 /** 1576 * Set JobSubmitHostName for this job. 1577 * 1578 * @param hostname the JobSubmitHostName for this job. 1579 */ 1580 void setJobSubmitHostName(String hostname) { 1581 set(MRJobConfig.JOB_SUBMITHOST, hostname); 1582 } 1583 1584 /** 1585 * Get the JobSubmitHostName for this job. 1586 * 1587 * @return the JobSubmitHostName for this job. 1588 */ 1589 String getJobSubmitHostName() { 1590 String hostname = get(MRJobConfig.JOB_SUBMITHOST); 1591 1592 return hostname; 1593 } 1594 1595 /** 1596 * Set JobSubmitHostAddress for this job. 1597 * 1598 * @param hostadd the JobSubmitHostAddress for this job. 1599 */ 1600 void setJobSubmitHostAddress(String hostadd) { 1601 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd); 1602 } 1603 1604 /** 1605 * Get JobSubmitHostAddress for this job. 1606 * 1607 * @return JobSubmitHostAddress for this job. 1608 */ 1609 String getJobSubmitHostAddress() { 1610 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR); 1611 1612 return hostadd; 1613 } 1614 1615 /** 1616 * Get whether the task profiling is enabled. 1617 * @return true if some tasks will be profiled 1618 */ 1619 public boolean getProfileEnabled() { 1620 return getBoolean(JobContext.TASK_PROFILE, false); 1621 } 1622 1623 /** 1624 * Set whether the system should collect profiler information for some of 1625 * the tasks in this job? The information is stored in the user log 1626 * directory. 1627 * @param newValue true means it should be gathered 1628 */ 1629 public void setProfileEnabled(boolean newValue) { 1630 setBoolean(JobContext.TASK_PROFILE, newValue); 1631 } 1632 1633 /** 1634 * Get the profiler configuration arguments. 1635 * 1636 * The default value for this property is 1637 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s" 1638 * 1639 * @return the parameters to pass to the task child to configure profiling 1640 */ 1641 public String getProfileParams() { 1642 return get(JobContext.TASK_PROFILE_PARAMS, 1643 "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," + 1644 "verbose=n,file=%s"); 1645 } 1646 1647 /** 1648 * Set the profiler configuration arguments. If the string contains a '%s' it 1649 * will be replaced with the name of the profiling output file when the task 1650 * runs. 1651 * 1652 * This value is passed to the task child JVM on the command line. 1653 * 1654 * @param value the configuration string 1655 */ 1656 public void setProfileParams(String value) { 1657 set(JobContext.TASK_PROFILE_PARAMS, value); 1658 } 1659 1660 /** 1661 * Get the range of maps or reduces to profile. 1662 * @param isMap is the task a map? 1663 * @return the task ranges 1664 */ 1665 public IntegerRanges getProfileTaskRange(boolean isMap) { 1666 return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 1667 JobContext.NUM_REDUCE_PROFILES), "0-2"); 1668 } 1669 1670 /** 1671 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1672 * must also be called. 1673 * @param newValue a set of integer ranges of the map ids 1674 */ 1675 public void setProfileTaskRange(boolean isMap, String newValue) { 1676 // parse the value to make sure it is legal 1677 new Configuration.IntegerRanges(newValue); 1678 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 1679 newValue); 1680 } 1681 1682 /** 1683 * Set the debug script to run when the map tasks fail. 1684 * 1685 * <p>The debug script can aid debugging of failed map tasks. The script is 1686 * given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1687 * 1688 * <p>The debug command, run on the node where the map failed, is:</p> 1689 * <p><pre><blockquote> 1690 * $script $stdout $stderr $syslog $jobconf. 1691 * </blockquote></pre></p> 1692 * 1693 * <p> The script file is distributed through {@link DistributedCache} 1694 * APIs. The script needs to be symlinked. </p> 1695 * 1696 * <p>Here is an example on how to submit a script 1697 * <p><blockquote><pre> 1698 * job.setMapDebugScript("./myscript"); 1699 * DistributedCache.createSymlink(job); 1700 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1701 * </pre></blockquote></p> 1702 * 1703 * @param mDbgScript the script name 1704 */ 1705 public void setMapDebugScript(String mDbgScript) { 1706 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript); 1707 } 1708 1709 /** 1710 * Get the map task's debug script. 1711 * 1712 * @return the debug Script for the mapred job for failed map tasks. 1713 * @see #setMapDebugScript(String) 1714 */ 1715 public String getMapDebugScript() { 1716 return get(JobContext.MAP_DEBUG_SCRIPT); 1717 } 1718 1719 /** 1720 * Set the debug script to run when the reduce tasks fail. 1721 * 1722 * <p>The debug script can aid debugging of failed reduce tasks. The script 1723 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1724 * 1725 * <p>The debug command, run on the node where the map failed, is:</p> 1726 * <p><pre><blockquote> 1727 * $script $stdout $stderr $syslog $jobconf. 1728 * </blockquote></pre></p> 1729 * 1730 * <p> The script file is distributed through {@link DistributedCache} 1731 * APIs. The script file needs to be symlinked </p> 1732 * 1733 * <p>Here is an example on how to submit a script 1734 * <p><blockquote><pre> 1735 * job.setReduceDebugScript("./myscript"); 1736 * DistributedCache.createSymlink(job); 1737 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1738 * </pre></blockquote></p> 1739 * 1740 * @param rDbgScript the script name 1741 */ 1742 public void setReduceDebugScript(String rDbgScript) { 1743 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript); 1744 } 1745 1746 /** 1747 * Get the reduce task's debug Script 1748 * 1749 * @return the debug script for the mapred job for failed reduce tasks. 1750 * @see #setReduceDebugScript(String) 1751 */ 1752 public String getReduceDebugScript() { 1753 return get(JobContext.REDUCE_DEBUG_SCRIPT); 1754 } 1755 1756 /** 1757 * Get the uri to be invoked in-order to send a notification after the job 1758 * has completed (success/failure). 1759 * 1760 * @return the job end notification uri, <code>null</code> if it hasn't 1761 * been set. 1762 * @see #setJobEndNotificationURI(String) 1763 */ 1764 public String getJobEndNotificationURI() { 1765 return get(JobContext.MR_JOB_END_NOTIFICATION_URL); 1766 } 1767 1768 /** 1769 * Set the uri to be invoked in-order to send a notification after the job 1770 * has completed (success/failure). 1771 * 1772 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 1773 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 1774 * identifier and completion-status respectively.</p> 1775 * 1776 * <p>This is typically used by application-writers to implement chaining of 1777 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p> 1778 * 1779 * @param uri the job end notification uri 1780 * @see JobStatus 1781 * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html# 1782 * JobCompletionAndChaining">Job Completion and Chaining</a> 1783 */ 1784 public void setJobEndNotificationURI(String uri) { 1785 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri); 1786 } 1787 1788 /** 1789 * Get job-specific shared directory for use as scratch space 1790 * 1791 * <p> 1792 * When a job starts, a shared directory is created at location 1793 * <code> 1794 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>. 1795 * This directory is exposed to the users through 1796 * <code>mapreduce.job.local.dir </code>. 1797 * So, the tasks can use this space 1798 * as scratch space and share files among them. </p> 1799 * This value is available as System property also. 1800 * 1801 * @return The localized job specific shared directory 1802 */ 1803 public String getJobLocalDir() { 1804 return get(JobContext.JOB_LOCAL_DIR); 1805 } 1806 1807 /** 1808 * Get memory required to run a map task of the job, in MB. 1809 * 1810 * If a value is specified in the configuration, it is returned. 1811 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}. 1812 * <p/> 1813 * For backward compatibility, if the job configuration sets the 1814 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1815 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1816 * after converting it from bytes to MB. 1817 * @return memory required to run a map task of the job, in MB, 1818 * or {@link #DISABLED_MEMORY_LIMIT} if unset. 1819 */ 1820 public long getMemoryForMapTask() { 1821 long value = getDeprecatedMemoryValue(); 1822 if (value == DISABLED_MEMORY_LIMIT) { 1823 value = normalizeMemoryConfigValue( 1824 getLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, 1825 DISABLED_MEMORY_LIMIT)); 1826 } 1827 // In case that M/R 1.x applications use the old property name 1828 if (value == DISABLED_MEMORY_LIMIT) { 1829 value = normalizeMemoryConfigValue( 1830 getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, 1831 DISABLED_MEMORY_LIMIT)); 1832 } 1833 return value; 1834 } 1835 1836 public void setMemoryForMapTask(long mem) { 1837 setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1838 // In case that M/R 1.x applications use the old property name 1839 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1840 } 1841 1842 /** 1843 * Get memory required to run a reduce task of the job, in MB. 1844 * 1845 * If a value is specified in the configuration, it is returned. 1846 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}. 1847 * <p/> 1848 * For backward compatibility, if the job configuration sets the 1849 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1850 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1851 * after converting it from bytes to MB. 1852 * @return memory required to run a reduce task of the job, in MB, 1853 * or {@link #DISABLED_MEMORY_LIMIT} if unset. 1854 */ 1855 public long getMemoryForReduceTask() { 1856 long value = getDeprecatedMemoryValue(); 1857 if (value == DISABLED_MEMORY_LIMIT) { 1858 value = normalizeMemoryConfigValue( 1859 getLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, 1860 DISABLED_MEMORY_LIMIT)); 1861 } 1862 // In case that M/R 1.x applications use the old property name 1863 if (value == DISABLED_MEMORY_LIMIT) { 1864 value = normalizeMemoryConfigValue( 1865 getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, 1866 DISABLED_MEMORY_LIMIT)); 1867 } 1868 return value; 1869 } 1870 1871 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY, 1872 // converted into MBs. 1873 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative 1874 // value. 1875 private long getDeprecatedMemoryValue() { 1876 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 1877 DISABLED_MEMORY_LIMIT); 1878 oldValue = normalizeMemoryConfigValue(oldValue); 1879 if (oldValue != DISABLED_MEMORY_LIMIT) { 1880 oldValue /= (1024*1024); 1881 } 1882 return oldValue; 1883 } 1884 1885 public void setMemoryForReduceTask(long mem) { 1886 setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1887 // In case that M/R 1.x applications use the old property name 1888 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1889 } 1890 1891 /** 1892 * Return the name of the queue to which this job is submitted. 1893 * Defaults to 'default'. 1894 * 1895 * @return name of the queue 1896 */ 1897 public String getQueueName() { 1898 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME); 1899 } 1900 1901 /** 1902 * Set the name of the queue to which this job should be submitted. 1903 * 1904 * @param queueName Name of the queue 1905 */ 1906 public void setQueueName(String queueName) { 1907 set(JobContext.QUEUE_NAME, queueName); 1908 } 1909 1910 /** 1911 * Normalize the negative values in configuration 1912 * 1913 * @param val 1914 * @return normalized value 1915 */ 1916 public static long normalizeMemoryConfigValue(long val) { 1917 if (val < 0) { 1918 val = DISABLED_MEMORY_LIMIT; 1919 } 1920 return val; 1921 } 1922 1923 /** 1924 * Compute the number of slots required to run a single map task-attempt 1925 * of this job. 1926 * @param slotSizePerMap cluster-wide value of the amount of memory required 1927 * to run a map-task 1928 * @return the number of slots required to run a single map task-attempt 1929 * 1 if memory parameters are disabled. 1930 */ 1931 int computeNumSlotsPerMap(long slotSizePerMap) { 1932 if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) || 1933 (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) { 1934 return 1; 1935 } 1936 return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap)); 1937 } 1938 1939 /** 1940 * Compute the number of slots required to run a single reduce task-attempt 1941 * of this job. 1942 * @param slotSizePerReduce cluster-wide value of the amount of memory 1943 * required to run a reduce-task 1944 * @return the number of slots required to run a single reduce task-attempt 1945 * 1 if memory parameters are disabled 1946 */ 1947 int computeNumSlotsPerReduce(long slotSizePerReduce) { 1948 if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) || 1949 (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) { 1950 return 1; 1951 } 1952 return 1953 (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce)); 1954 } 1955 1956 /** 1957 * Find a jar that contains a class of the same name, if any. 1958 * It will return a jar file, even if that is not the first thing 1959 * on the class path that has a class with the same name. 1960 * 1961 * @param my_class the class to find. 1962 * @return a jar file that contains the class, or null. 1963 * @throws IOException 1964 */ 1965 public static String findContainingJar(Class my_class) { 1966 return ClassUtil.findContainingJar(my_class); 1967 } 1968 1969 /** 1970 * Get the memory required to run a task of this job, in bytes. See 1971 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 1972 * <p/> 1973 * This method is deprecated. Now, different memory limits can be 1974 * set for map and reduce tasks of a job, in MB. 1975 * <p/> 1976 * For backward compatibility, if the job configuration sets the 1977 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1978 * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned. 1979 * Otherwise, this method will return the larger of the values returned by 1980 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()} 1981 * after converting them into bytes. 1982 * 1983 * @return Memory required to run a task of this job, in bytes, 1984 * or {@link #DISABLED_MEMORY_LIMIT}, if unset. 1985 * @see #setMaxVirtualMemoryForTask(long) 1986 * @deprecated Use {@link #getMemoryForMapTask()} and 1987 * {@link #getMemoryForReduceTask()} 1988 */ 1989 @Deprecated 1990 public long getMaxVirtualMemoryForTask() { 1991 LOG.warn( 1992 "getMaxVirtualMemoryForTask() is deprecated. " + 1993 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()"); 1994 1995 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT); 1996 value = normalizeMemoryConfigValue(value); 1997 if (value == DISABLED_MEMORY_LIMIT) { 1998 value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask()); 1999 value = normalizeMemoryConfigValue(value); 2000 if (value != DISABLED_MEMORY_LIMIT) { 2001 value *= 1024*1024; 2002 } 2003 } 2004 return value; 2005 } 2006 2007 /** 2008 * Set the maximum amount of memory any task of this job can use. See 2009 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 2010 * <p/> 2011 * mapred.task.maxvmem is split into 2012 * mapreduce.map.memory.mb 2013 * and mapreduce.map.memory.mb,mapred 2014 * each of the new key are set 2015 * as mapred.task.maxvmem / 1024 2016 * as new values are in MB 2017 * 2018 * @param vmem Maximum amount of virtual memory in bytes any task of this job 2019 * can use. 2020 * @see #getMaxVirtualMemoryForTask() 2021 * @deprecated 2022 * Use {@link #setMemoryForMapTask(long mem)} and 2023 * Use {@link #setMemoryForReduceTask(long mem)} 2024 */ 2025 @Deprecated 2026 public void setMaxVirtualMemoryForTask(long vmem) { 2027 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+ 2028 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()"); 2029 if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) { 2030 setMemoryForMapTask(DISABLED_MEMORY_LIMIT); 2031 setMemoryForReduceTask(DISABLED_MEMORY_LIMIT); 2032 } 2033 2034 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) { 2035 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb 2036 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb 2037 }else{ 2038 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem); 2039 } 2040 } 2041 2042 /** 2043 * @deprecated this variable is deprecated and nolonger in use. 2044 */ 2045 @Deprecated 2046 public long getMaxPhysicalMemoryForTask() { 2047 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated." 2048 + " Refer to the APIs getMemoryForMapTask() and" 2049 + " getMemoryForReduceTask() for details."); 2050 return -1; 2051 } 2052 2053 /* 2054 * @deprecated this 2055 */ 2056 @Deprecated 2057 public void setMaxPhysicalMemoryForTask(long mem) { 2058 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated." 2059 + " The value set is ignored. Refer to " 2060 + " setMemoryForMapTask() and setMemoryForReduceTask() for details."); 2061 } 2062 2063 static String deprecatedString(String key) { 2064 return "The variable " + key + " is no longer used."; 2065 } 2066 2067 private void checkAndWarnDeprecation() { 2068 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) { 2069 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) 2070 + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY 2071 + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY); 2072 } 2073 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) { 2074 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT)); 2075 } 2076 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) { 2077 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT)); 2078 } 2079 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) { 2080 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT)); 2081 } 2082 } 2083 2084 2085 } 2086