001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021 022import java.io.IOException; 023import java.net.URL; 024import java.net.URLDecoder; 025import java.util.Enumeration; 026import java.util.regex.Pattern; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.io.LongWritable; 037import org.apache.hadoop.io.RawComparator; 038import org.apache.hadoop.io.Text; 039import org.apache.hadoop.io.WritableComparable; 040import org.apache.hadoop.io.WritableComparator; 041import org.apache.hadoop.io.compress.CompressionCodec; 042import org.apache.hadoop.mapred.lib.HashPartitioner; 043import org.apache.hadoop.mapred.lib.IdentityMapper; 044import org.apache.hadoop.mapred.lib.IdentityReducer; 045import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator; 046import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner; 047import org.apache.hadoop.mapreduce.MRConfig; 048import org.apache.hadoop.mapreduce.MRJobConfig; 049import org.apache.hadoop.mapreduce.filecache.DistributedCache; 050import org.apache.hadoop.mapreduce.util.ConfigUtil; 051import org.apache.hadoop.security.Credentials; 052import org.apache.hadoop.util.ReflectionUtils; 053import org.apache.hadoop.util.Tool; 054import org.apache.log4j.Level; 055 056/** 057 * A map/reduce job configuration. 058 * 059 * <p><code>JobConf</code> is the primary interface for a user to describe a 060 * map-reduce job to the Hadoop framework for execution. The framework tries to 061 * faithfully execute the job as-is described by <code>JobConf</code>, however: 062 * <ol> 063 * <li> 064 * Some configuration parameters might have been marked as 065 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams"> 066 * final</a> by administrators and hence cannot be altered. 067 * </li> 068 * <li> 069 * While some job parameters are straight-forward to set 070 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 071 * rest of the framework and/or job-configuration and is relatively more 072 * complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}). 073 * </li> 074 * </ol></p> 075 * 076 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 077 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 078 * {@link OutputFormat} implementations to be used etc. 079 * 080 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 081 * of the job such as <code>Comparator</code>s to be used, files to be put in 082 * the {@link DistributedCache}, whether or not intermediate and/or job outputs 083 * are to be compressed (and how), debugability via user-provided scripts 084 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}), 085 * for doing post-processing on task logs, task's stdout, stderr, syslog. 086 * and etc.</p> 087 * 088 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p> 089 * <p><blockquote><pre> 090 * // Create a new JobConf 091 * JobConf job = new JobConf(new Configuration(), MyJob.class); 092 * 093 * // Specify various job-specific parameters 094 * job.setJobName("myjob"); 095 * 096 * FileInputFormat.setInputPaths(job, new Path("in")); 097 * FileOutputFormat.setOutputPath(job, new Path("out")); 098 * 099 * job.setMapperClass(MyJob.MyMapper.class); 100 * job.setCombinerClass(MyJob.MyReducer.class); 101 * job.setReducerClass(MyJob.MyReducer.class); 102 * 103 * job.setInputFormat(SequenceFileInputFormat.class); 104 * job.setOutputFormat(SequenceFileOutputFormat.class); 105 * </pre></blockquote></p> 106 * 107 * @see JobClient 108 * @see ClusterStatus 109 * @see Tool 110 * @see DistributedCache 111 */ 112@InterfaceAudience.Public 113@InterfaceStability.Stable 114public class JobConf extends Configuration { 115 116 private static final Log LOG = LogFactory.getLog(JobConf.class); 117 118 static{ 119 ConfigUtil.loadResources(); 120 } 121 122 /** 123 * @deprecated Use {@link #MAPRED_JOB_MAP_MEMORY_MB_PROPERTY} and 124 * {@link #MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY} 125 */ 126 @Deprecated 127 public static final String MAPRED_TASK_MAXVMEM_PROPERTY = 128 "mapred.task.maxvmem"; 129 130 /** 131 * @deprecated 132 */ 133 @Deprecated 134 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY = 135 "mapred.task.limit.maxvmem"; 136 137 /** 138 * @deprecated 139 */ 140 @Deprecated 141 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY = 142 "mapred.task.default.maxvmem"; 143 144 /** 145 * @deprecated 146 */ 147 @Deprecated 148 public static final String MAPRED_TASK_MAXPMEM_PROPERTY = 149 "mapred.task.maxpmem"; 150 151 /** 152 * A value which if set for memory related configuration options, 153 * indicates that the options are turned off. 154 */ 155 public static final long DISABLED_MEMORY_LIMIT = -1L; 156 157 /** 158 * Property name for the configuration property mapreduce.cluster.local.dir 159 */ 160 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR; 161 162 /** 163 * Name of the queue to which jobs will be submitted, if no queue 164 * name is mentioned. 165 */ 166 public static final String DEFAULT_QUEUE_NAME = "default"; 167 168 static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY = 169 JobContext.MAP_MEMORY_MB; 170 171 static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY = 172 JobContext.REDUCE_MEMORY_MB; 173 174 /** Pattern for the default unpacking behavior for job jars */ 175 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT = 176 Pattern.compile("(?:classes/|lib/).*"); 177 178 /** 179 * Configuration key to set the java command line options for the child 180 * map and reduce tasks. 181 * 182 * Java opts for the task tracker child processes. 183 * The following symbol, if present, will be interpolated: @taskid@. 184 * It is replaced by current TaskID. Any other occurrences of '@' will go 185 * unchanged. 186 * For example, to enable verbose gc logging to a file named for the taskid in 187 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 188 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 189 * 190 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 191 * other environment variables to the child processes. 192 * 193 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 194 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS} 195 */ 196 @Deprecated 197 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts"; 198 199 /** 200 * Configuration key to set the java command line options for the map tasks. 201 * 202 * Java opts for the task tracker child map processes. 203 * The following symbol, if present, will be interpolated: @taskid@. 204 * It is replaced by current TaskID. Any other occurrences of '@' will go 205 * unchanged. 206 * For example, to enable verbose gc logging to a file named for the taskid in 207 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 208 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 209 * 210 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 211 * other environment variables to the map processes. 212 */ 213 public static final String MAPRED_MAP_TASK_JAVA_OPTS = 214 JobContext.MAP_JAVA_OPTS; 215 216 /** 217 * Configuration key to set the java command line options for the reduce tasks. 218 * 219 * Java opts for the task tracker child reduce processes. 220 * The following symbol, if present, will be interpolated: @taskid@. 221 * It is replaced by current TaskID. Any other occurrences of '@' will go 222 * unchanged. 223 * For example, to enable verbose gc logging to a file named for the taskid in 224 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 225 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 226 * 227 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 228 * pass process environment variables to the reduce processes. 229 */ 230 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 231 JobContext.REDUCE_JAVA_OPTS; 232 233 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m"; 234 235 /** 236 * @deprecated 237 * Configuration key to set the maximum virtual memory available to the child 238 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no 239 * longer have any effect. 240 */ 241 @Deprecated 242 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit"; 243 244 /** 245 * @deprecated 246 * Configuration key to set the maximum virtual memory available to the 247 * map tasks (in kilo-bytes). This has been deprecated and will no 248 * longer have any effect. 249 */ 250 @Deprecated 251 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit"; 252 253 /** 254 * @deprecated 255 * Configuration key to set the maximum virtual memory available to the 256 * reduce tasks (in kilo-bytes). This has been deprecated and will no 257 * longer have any effect. 258 */ 259 @Deprecated 260 public static final String MAPRED_REDUCE_TASK_ULIMIT = 261 "mapreduce.reduce.ulimit"; 262 263 264 /** 265 * Configuration key to set the environment of the child map/reduce tasks. 266 * 267 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 268 * reference existing environment variables via <code>$key</code>. 269 * 270 * Example: 271 * <ul> 272 * <li> A=foo - This will set the env variable A to foo. </li> 273 * <li> B=$X:c This is inherit tasktracker's X env variable. </li> 274 * </ul> 275 * 276 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 277 * {@link #MAPRED_REDUCE_TASK_ENV} 278 */ 279 @Deprecated 280 public static final String MAPRED_TASK_ENV = "mapred.child.env"; 281 282 /** 283 * Configuration key to set the maximum virutal memory available to the 284 * map tasks. 285 * 286 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 287 * reference existing environment variables via <code>$key</code>. 288 * 289 * Example: 290 * <ul> 291 * <li> A=foo - This will set the env variable A to foo. </li> 292 * <li> B=$X:c This is inherit tasktracker's X env variable. </li> 293 * </ul> 294 */ 295 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV; 296 297 /** 298 * Configuration key to set the maximum virutal memory available to the 299 * reduce tasks. 300 * 301 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 302 * reference existing environment variables via <code>$key</code>. 303 * 304 * Example: 305 * <ul> 306 * <li> A=foo - This will set the env variable A to foo. </li> 307 * <li> B=$X:c This is inherit tasktracker's X env variable. </li> 308 * </ul> 309 */ 310 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV; 311 312 private Credentials credentials = new Credentials(); 313 314 /** 315 * Configuration key to set the logging {@link Level} for the map task. 316 * 317 * The allowed logging levels are: 318 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 319 */ 320 public static final String MAPRED_MAP_TASK_LOG_LEVEL = 321 JobContext.MAP_LOG_LEVEL; 322 323 /** 324 * Configuration key to set the logging {@link Level} for the reduce task. 325 * 326 * The allowed logging levels are: 327 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 328 */ 329 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 330 JobContext.REDUCE_LOG_LEVEL; 331 332 /** 333 * Default logging level for map/reduce tasks. 334 */ 335 public static final Level DEFAULT_LOG_LEVEL = Level.INFO; 336 337 338 /** 339 * Construct a map/reduce job configuration. 340 */ 341 public JobConf() { 342 checkAndWarnDeprecation(); 343 } 344 345 /** 346 * Construct a map/reduce job configuration. 347 * 348 * @param exampleClass a class whose containing jar is used as the job's jar. 349 */ 350 public JobConf(Class exampleClass) { 351 setJarByClass(exampleClass); 352 checkAndWarnDeprecation(); 353 } 354 355 /** 356 * Construct a map/reduce job configuration. 357 * 358 * @param conf a Configuration whose settings will be inherited. 359 */ 360 public JobConf(Configuration conf) { 361 super(conf); 362 363 if (conf instanceof JobConf) { 364 JobConf that = (JobConf)conf; 365 credentials = that.credentials; 366 } 367 368 checkAndWarnDeprecation(); 369 } 370 371 372 /** Construct a map/reduce job configuration. 373 * 374 * @param conf a Configuration whose settings will be inherited. 375 * @param exampleClass a class whose containing jar is used as the job's jar. 376 */ 377 public JobConf(Configuration conf, Class exampleClass) { 378 this(conf); 379 setJarByClass(exampleClass); 380 } 381 382 383 /** Construct a map/reduce configuration. 384 * 385 * @param config a Configuration-format XML job description file. 386 */ 387 public JobConf(String config) { 388 this(new Path(config)); 389 } 390 391 /** Construct a map/reduce configuration. 392 * 393 * @param config a Configuration-format XML job description file. 394 */ 395 public JobConf(Path config) { 396 super(); 397 addResource(config); 398 checkAndWarnDeprecation(); 399 } 400 401 /** A new map/reduce configuration where the behavior of reading from the 402 * default resources can be turned off. 403 * <p/> 404 * If the parameter {@code loadDefaults} is false, the new instance 405 * will not load resources from the default files. 406 * 407 * @param loadDefaults specifies whether to load from the default files 408 */ 409 public JobConf(boolean loadDefaults) { 410 super(loadDefaults); 411 checkAndWarnDeprecation(); 412 } 413 414 /** 415 * Get credentials for the job. 416 * @return credentials for the job 417 */ 418 public Credentials getCredentials() { 419 return credentials; 420 } 421 422 void setCredentials(Credentials credentials) { 423 this.credentials = credentials; 424 } 425 426 /** 427 * Get the user jar for the map-reduce job. 428 * 429 * @return the user jar for the map-reduce job. 430 */ 431 public String getJar() { return get(JobContext.JAR); } 432 433 /** 434 * Set the user jar for the map-reduce job. 435 * 436 * @param jar the user jar for the map-reduce job. 437 */ 438 public void setJar(String jar) { set(JobContext.JAR, jar); } 439 440 /** 441 * Get the pattern for jar contents to unpack on the tasktracker 442 */ 443 public Pattern getJarUnpackPattern() { 444 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT); 445 } 446 447 448 /** 449 * Set the job's jar file by finding an example class location. 450 * 451 * @param cls the example class. 452 */ 453 public void setJarByClass(Class cls) { 454 String jar = findContainingJar(cls); 455 if (jar != null) { 456 setJar(jar); 457 } 458 } 459 460 public String[] getLocalDirs() throws IOException { 461 return getTrimmedStrings(MRConfig.LOCAL_DIR); 462 } 463 464 /** 465 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead. 466 */ 467 @Deprecated 468 public void deleteLocalFiles() throws IOException { 469 String[] localDirs = getLocalDirs(); 470 for (int i = 0; i < localDirs.length; i++) { 471 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true); 472 } 473 } 474 475 public void deleteLocalFiles(String subdir) throws IOException { 476 String[] localDirs = getLocalDirs(); 477 for (int i = 0; i < localDirs.length; i++) { 478 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true); 479 } 480 } 481 482 /** 483 * Constructs a local file name. Files are distributed among configured 484 * local directories. 485 */ 486 public Path getLocalPath(String pathString) throws IOException { 487 return getLocalPath(MRConfig.LOCAL_DIR, pathString); 488 } 489 490 /** 491 * Get the reported username for this job. 492 * 493 * @return the username 494 */ 495 public String getUser() { 496 return get(JobContext.USER_NAME); 497 } 498 499 /** 500 * Set the reported username for this job. 501 * 502 * @param user the username for this job. 503 */ 504 public void setUser(String user) { 505 set(JobContext.USER_NAME, user); 506 } 507 508 509 510 /** 511 * Set whether the framework should keep the intermediate files for 512 * failed tasks. 513 * 514 * @param keep <code>true</code> if framework should keep the intermediate files 515 * for failed tasks, <code>false</code> otherwise. 516 * 517 */ 518 public void setKeepFailedTaskFiles(boolean keep) { 519 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep); 520 } 521 522 /** 523 * Should the temporary files for failed tasks be kept? 524 * 525 * @return should the files be kept? 526 */ 527 public boolean getKeepFailedTaskFiles() { 528 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false); 529 } 530 531 /** 532 * Set a regular expression for task names that should be kept. 533 * The regular expression ".*_m_000123_0" would keep the files 534 * for the first instance of map 123 that ran. 535 * 536 * @param pattern the java.util.regex.Pattern to match against the 537 * task names. 538 */ 539 public void setKeepTaskFilesPattern(String pattern) { 540 set(JobContext.PRESERVE_FILES_PATTERN, pattern); 541 } 542 543 /** 544 * Get the regular expression that is matched against the task names 545 * to see if we need to keep the files. 546 * 547 * @return the pattern as a string, if it was set, othewise null. 548 */ 549 public String getKeepTaskFilesPattern() { 550 return get(JobContext.PRESERVE_FILES_PATTERN); 551 } 552 553 /** 554 * Set the current working directory for the default file system. 555 * 556 * @param dir the new current working directory. 557 */ 558 public void setWorkingDirectory(Path dir) { 559 dir = new Path(getWorkingDirectory(), dir); 560 set(JobContext.WORKING_DIR, dir.toString()); 561 } 562 563 /** 564 * Get the current working directory for the default file system. 565 * 566 * @return the directory name. 567 */ 568 public Path getWorkingDirectory() { 569 String name = get(JobContext.WORKING_DIR); 570 if (name != null) { 571 return new Path(name); 572 } else { 573 try { 574 Path dir = FileSystem.get(this).getWorkingDirectory(); 575 set(JobContext.WORKING_DIR, dir.toString()); 576 return dir; 577 } catch (IOException e) { 578 throw new RuntimeException(e); 579 } 580 } 581 } 582 583 /** 584 * Sets the number of tasks that a spawned task JVM should run 585 * before it exits 586 * @param numTasks the number of tasks to execute; defaults to 1; 587 * -1 signifies no limit 588 */ 589 public void setNumTasksToExecutePerJvm(int numTasks) { 590 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks); 591 } 592 593 /** 594 * Get the number of tasks that a spawned JVM should execute 595 */ 596 public int getNumTasksToExecutePerJvm() { 597 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1); 598 } 599 600 /** 601 * Get the {@link InputFormat} implementation for the map-reduce job, 602 * defaults to {@link TextInputFormat} if not specified explicity. 603 * 604 * @return the {@link InputFormat} implementation for the map-reduce job. 605 */ 606 public InputFormat getInputFormat() { 607 return ReflectionUtils.newInstance(getClass("mapred.input.format.class", 608 TextInputFormat.class, 609 InputFormat.class), 610 this); 611 } 612 613 /** 614 * Set the {@link InputFormat} implementation for the map-reduce job. 615 * 616 * @param theClass the {@link InputFormat} implementation for the map-reduce 617 * job. 618 */ 619 public void setInputFormat(Class<? extends InputFormat> theClass) { 620 setClass("mapred.input.format.class", theClass, InputFormat.class); 621 } 622 623 /** 624 * Get the {@link OutputFormat} implementation for the map-reduce job, 625 * defaults to {@link TextOutputFormat} if not specified explicity. 626 * 627 * @return the {@link OutputFormat} implementation for the map-reduce job. 628 */ 629 public OutputFormat getOutputFormat() { 630 return ReflectionUtils.newInstance(getClass("mapred.output.format.class", 631 TextOutputFormat.class, 632 OutputFormat.class), 633 this); 634 } 635 636 /** 637 * Get the {@link OutputCommitter} implementation for the map-reduce job, 638 * defaults to {@link FileOutputCommitter} if not specified explicitly. 639 * 640 * @return the {@link OutputCommitter} implementation for the map-reduce job. 641 */ 642 public OutputCommitter getOutputCommitter() { 643 return (OutputCommitter)ReflectionUtils.newInstance( 644 getClass("mapred.output.committer.class", FileOutputCommitter.class, 645 OutputCommitter.class), this); 646 } 647 648 /** 649 * Set the {@link OutputCommitter} implementation for the map-reduce job. 650 * 651 * @param theClass the {@link OutputCommitter} implementation for the map-reduce 652 * job. 653 */ 654 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) { 655 setClass("mapred.output.committer.class", theClass, OutputCommitter.class); 656 } 657 658 /** 659 * Set the {@link OutputFormat} implementation for the map-reduce job. 660 * 661 * @param theClass the {@link OutputFormat} implementation for the map-reduce 662 * job. 663 */ 664 public void setOutputFormat(Class<? extends OutputFormat> theClass) { 665 setClass("mapred.output.format.class", theClass, OutputFormat.class); 666 } 667 668 /** 669 * Should the map outputs be compressed before transfer? 670 * 671 * @param compress should the map outputs be compressed? 672 */ 673 public void setCompressMapOutput(boolean compress) { 674 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress); 675 } 676 677 /** 678 * Are the outputs of the maps be compressed? 679 * 680 * @return <code>true</code> if the outputs of the maps are to be compressed, 681 * <code>false</code> otherwise. 682 */ 683 public boolean getCompressMapOutput() { 684 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false); 685 } 686 687 /** 688 * Set the given class as the {@link CompressionCodec} for the map outputs. 689 * 690 * @param codecClass the {@link CompressionCodec} class that will compress 691 * the map outputs. 692 */ 693 public void 694 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) { 695 setCompressMapOutput(true); 696 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 697 CompressionCodec.class); 698 } 699 700 /** 701 * Get the {@link CompressionCodec} for compressing the map outputs. 702 * 703 * @param defaultValue the {@link CompressionCodec} to return if not set 704 * @return the {@link CompressionCodec} class that should be used to compress the 705 * map outputs. 706 * @throws IllegalArgumentException if the class was specified, but not found 707 */ 708 public Class<? extends CompressionCodec> 709 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) { 710 Class<? extends CompressionCodec> codecClass = defaultValue; 711 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC); 712 if (name != null) { 713 try { 714 codecClass = getClassByName(name).asSubclass(CompressionCodec.class); 715 } catch (ClassNotFoundException e) { 716 throw new IllegalArgumentException("Compression codec " + name + 717 " was not found.", e); 718 } 719 } 720 return codecClass; 721 } 722 723 /** 724 * Get the key class for the map output data. If it is not set, use the 725 * (final) output key class. This allows the map output key class to be 726 * different than the final output key class. 727 * 728 * @return the map output key class. 729 */ 730 public Class<?> getMapOutputKeyClass() { 731 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class); 732 if (retv == null) { 733 retv = getOutputKeyClass(); 734 } 735 return retv; 736 } 737 738 /** 739 * Set the key class for the map output data. This allows the user to 740 * specify the map output key class to be different than the final output 741 * value class. 742 * 743 * @param theClass the map output key class. 744 */ 745 public void setMapOutputKeyClass(Class<?> theClass) { 746 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class); 747 } 748 749 /** 750 * Get the value class for the map output data. If it is not set, use the 751 * (final) output value class This allows the map output value class to be 752 * different than the final output value class. 753 * 754 * @return the map output value class. 755 */ 756 public Class<?> getMapOutputValueClass() { 757 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null, 758 Object.class); 759 if (retv == null) { 760 retv = getOutputValueClass(); 761 } 762 return retv; 763 } 764 765 /** 766 * Set the value class for the map output data. This allows the user to 767 * specify the map output value class to be different than the final output 768 * value class. 769 * 770 * @param theClass the map output value class. 771 */ 772 public void setMapOutputValueClass(Class<?> theClass) { 773 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class); 774 } 775 776 /** 777 * Get the key class for the job output data. 778 * 779 * @return the key class for the job output data. 780 */ 781 public Class<?> getOutputKeyClass() { 782 return getClass(JobContext.OUTPUT_KEY_CLASS, 783 LongWritable.class, Object.class); 784 } 785 786 /** 787 * Set the key class for the job output data. 788 * 789 * @param theClass the key class for the job output data. 790 */ 791 public void setOutputKeyClass(Class<?> theClass) { 792 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class); 793 } 794 795 /** 796 * Get the {@link RawComparator} comparator used to compare keys. 797 * 798 * @return the {@link RawComparator} comparator used to compare keys. 799 */ 800 public RawComparator getOutputKeyComparator() { 801 Class<? extends RawComparator> theClass = getClass( 802 JobContext.KEY_COMPARATOR, null, RawComparator.class); 803 if (theClass != null) 804 return ReflectionUtils.newInstance(theClass, this); 805 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class)); 806 } 807 808 /** 809 * Set the {@link RawComparator} comparator used to compare keys. 810 * 811 * @param theClass the {@link RawComparator} comparator used to 812 * compare keys. 813 * @see #setOutputValueGroupingComparator(Class) 814 */ 815 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) { 816 setClass(JobContext.KEY_COMPARATOR, 817 theClass, RawComparator.class); 818 } 819 820 /** 821 * Set the {@link KeyFieldBasedComparator} options used to compare keys. 822 * 823 * @param keySpec the key specification of the form -k pos1[,pos2], where, 824 * pos is of the form f[.c][opts], where f is the number 825 * of the key field to use, and c is the number of the first character from 826 * the beginning of the field. Fields and character posns are numbered 827 * starting with 1; a character position of zero in pos2 indicates the 828 * field's last character. If '.c' is omitted from pos1, it defaults to 1 829 * (the beginning of the field); if omitted from pos2, it defaults to 0 830 * (the end of the field). opts are ordering options. The supported options 831 * are: 832 * -n, (Sort numerically) 833 * -r, (Reverse the result of comparison) 834 */ 835 public void setKeyFieldComparatorOptions(String keySpec) { 836 setOutputKeyComparatorClass(KeyFieldBasedComparator.class); 837 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec); 838 } 839 840 /** 841 * Get the {@link KeyFieldBasedComparator} options 842 */ 843 public String getKeyFieldComparatorOption() { 844 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS); 845 } 846 847 /** 848 * Set the {@link KeyFieldBasedPartitioner} options used for 849 * {@link Partitioner} 850 * 851 * @param keySpec the key specification of the form -k pos1[,pos2], where, 852 * pos is of the form f[.c][opts], where f is the number 853 * of the key field to use, and c is the number of the first character from 854 * the beginning of the field. Fields and character posns are numbered 855 * starting with 1; a character position of zero in pos2 indicates the 856 * field's last character. If '.c' is omitted from pos1, it defaults to 1 857 * (the beginning of the field); if omitted from pos2, it defaults to 0 858 * (the end of the field). 859 */ 860 public void setKeyFieldPartitionerOptions(String keySpec) { 861 setPartitionerClass(KeyFieldBasedPartitioner.class); 862 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec); 863 } 864 865 /** 866 * Get the {@link KeyFieldBasedPartitioner} options 867 */ 868 public String getKeyFieldPartitionerOption() { 869 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS); 870 } 871 872 /** 873 * Get the user defined {@link WritableComparable} comparator for 874 * grouping keys of inputs to the reduce. 875 * 876 * @return comparator set by the user for grouping values. 877 * @see #setOutputValueGroupingComparator(Class) for details. 878 */ 879 public RawComparator getOutputValueGroupingComparator() { 880 Class<? extends RawComparator> theClass = getClass( 881 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class); 882 if (theClass == null) { 883 return getOutputKeyComparator(); 884 } 885 886 return ReflectionUtils.newInstance(theClass, this); 887 } 888 889 /** 890 * Set the user defined {@link RawComparator} comparator for 891 * grouping keys in the input to the reduce. 892 * 893 * <p>This comparator should be provided if the equivalence rules for keys 894 * for sorting the intermediates are different from those for grouping keys 895 * before each call to 896 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 897 * 898 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 899 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 900 * 901 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 902 * how keys are sorted, this can be used in conjunction to simulate 903 * <i>secondary sort on values</i>.</p> 904 * 905 * <p><i>Note</i>: This is not a guarantee of the reduce sort being 906 * <i>stable</i> in any sense. (In any case, with the order of available 907 * map-outputs to the reduce being non-deterministic, it wouldn't make 908 * that much sense.)</p> 909 * 910 * @param theClass the comparator class to be used for grouping keys. 911 * It should implement <code>RawComparator</code>. 912 * @see #setOutputKeyComparatorClass(Class) 913 */ 914 public void setOutputValueGroupingComparator( 915 Class<? extends RawComparator> theClass) { 916 setClass(JobContext.GROUP_COMPARATOR_CLASS, 917 theClass, RawComparator.class); 918 } 919 920 /** 921 * Should the framework use the new context-object code for running 922 * the mapper? 923 * @return true, if the new api should be used 924 */ 925 public boolean getUseNewMapper() { 926 return getBoolean("mapred.mapper.new-api", false); 927 } 928 /** 929 * Set whether the framework should use the new api for the mapper. 930 * This is the default for jobs submitted with the new Job api. 931 * @param flag true, if the new api should be used 932 */ 933 public void setUseNewMapper(boolean flag) { 934 setBoolean("mapred.mapper.new-api", flag); 935 } 936 937 /** 938 * Should the framework use the new context-object code for running 939 * the reducer? 940 * @return true, if the new api should be used 941 */ 942 public boolean getUseNewReducer() { 943 return getBoolean("mapred.reducer.new-api", false); 944 } 945 /** 946 * Set whether the framework should use the new api for the reducer. 947 * This is the default for jobs submitted with the new Job api. 948 * @param flag true, if the new api should be used 949 */ 950 public void setUseNewReducer(boolean flag) { 951 setBoolean("mapred.reducer.new-api", flag); 952 } 953 954 /** 955 * Get the value class for job outputs. 956 * 957 * @return the value class for job outputs. 958 */ 959 public Class<?> getOutputValueClass() { 960 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class); 961 } 962 963 /** 964 * Set the value class for job outputs. 965 * 966 * @param theClass the value class for job outputs. 967 */ 968 public void setOutputValueClass(Class<?> theClass) { 969 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class); 970 } 971 972 /** 973 * Get the {@link Mapper} class for the job. 974 * 975 * @return the {@link Mapper} class for the job. 976 */ 977 public Class<? extends Mapper> getMapperClass() { 978 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class); 979 } 980 981 /** 982 * Set the {@link Mapper} class for the job. 983 * 984 * @param theClass the {@link Mapper} class for the job. 985 */ 986 public void setMapperClass(Class<? extends Mapper> theClass) { 987 setClass("mapred.mapper.class", theClass, Mapper.class); 988 } 989 990 /** 991 * Get the {@link MapRunnable} class for the job. 992 * 993 * @return the {@link MapRunnable} class for the job. 994 */ 995 public Class<? extends MapRunnable> getMapRunnerClass() { 996 return getClass("mapred.map.runner.class", 997 MapRunner.class, MapRunnable.class); 998 } 999 1000 /** 1001 * Expert: Set the {@link MapRunnable} class for the job. 1002 * 1003 * Typically used to exert greater control on {@link Mapper}s. 1004 * 1005 * @param theClass the {@link MapRunnable} class for the job. 1006 */ 1007 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) { 1008 setClass("mapred.map.runner.class", theClass, MapRunnable.class); 1009 } 1010 1011 /** 1012 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 1013 * to be sent to the {@link Reducer}s. 1014 * 1015 * @return the {@link Partitioner} used to partition map-outputs. 1016 */ 1017 public Class<? extends Partitioner> getPartitionerClass() { 1018 return getClass("mapred.partitioner.class", 1019 HashPartitioner.class, Partitioner.class); 1020 } 1021 1022 /** 1023 * Set the {@link Partitioner} class used to partition 1024 * {@link Mapper}-outputs to be sent to the {@link Reducer}s. 1025 * 1026 * @param theClass the {@link Partitioner} used to partition map-outputs. 1027 */ 1028 public void setPartitionerClass(Class<? extends Partitioner> theClass) { 1029 setClass("mapred.partitioner.class", theClass, Partitioner.class); 1030 } 1031 1032 /** 1033 * Get the {@link Reducer} class for the job. 1034 * 1035 * @return the {@link Reducer} class for the job. 1036 */ 1037 public Class<? extends Reducer> getReducerClass() { 1038 return getClass("mapred.reducer.class", 1039 IdentityReducer.class, Reducer.class); 1040 } 1041 1042 /** 1043 * Set the {@link Reducer} class for the job. 1044 * 1045 * @param theClass the {@link Reducer} class for the job. 1046 */ 1047 public void setReducerClass(Class<? extends Reducer> theClass) { 1048 setClass("mapred.reducer.class", theClass, Reducer.class); 1049 } 1050 1051 /** 1052 * Get the user-defined <i>combiner</i> class used to combine map-outputs 1053 * before being sent to the reducers. Typically the combiner is same as the 1054 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}. 1055 * 1056 * @return the user-defined combiner class used to combine map-outputs. 1057 */ 1058 public Class<? extends Reducer> getCombinerClass() { 1059 return getClass("mapred.combiner.class", null, Reducer.class); 1060 } 1061 1062 /** 1063 * Set the user-defined <i>combiner</i> class used to combine map-outputs 1064 * before being sent to the reducers. 1065 * 1066 * <p>The combiner is an application-specified aggregation operation, which 1067 * can help cut down the amount of data transferred between the 1068 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p> 1069 * 1070 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both 1071 * the mapper and reducer tasks. In general, the combiner is called as the 1072 * sort/merge result is written to disk. The combiner must: 1073 * <ul> 1074 * <li> be side-effect free</li> 1075 * <li> have the same input and output key types and the same input and 1076 * output value types</li> 1077 * </ul></p> 1078 * 1079 * <p>Typically the combiner is same as the <code>Reducer</code> for the 1080 * job i.e. {@link #setReducerClass(Class)}.</p> 1081 * 1082 * @param theClass the user-defined combiner class used to combine 1083 * map-outputs. 1084 */ 1085 public void setCombinerClass(Class<? extends Reducer> theClass) { 1086 setClass("mapred.combiner.class", theClass, Reducer.class); 1087 } 1088 1089 /** 1090 * Should speculative execution be used for this job? 1091 * Defaults to <code>true</code>. 1092 * 1093 * @return <code>true</code> if speculative execution be used for this job, 1094 * <code>false</code> otherwise. 1095 */ 1096 public boolean getSpeculativeExecution() { 1097 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution()); 1098 } 1099 1100 /** 1101 * Turn speculative execution on or off for this job. 1102 * 1103 * @param speculativeExecution <code>true</code> if speculative execution 1104 * should be turned on, else <code>false</code>. 1105 */ 1106 public void setSpeculativeExecution(boolean speculativeExecution) { 1107 setMapSpeculativeExecution(speculativeExecution); 1108 setReduceSpeculativeExecution(speculativeExecution); 1109 } 1110 1111 /** 1112 * Should speculative execution be used for this job for map tasks? 1113 * Defaults to <code>true</code>. 1114 * 1115 * @return <code>true</code> if speculative execution be 1116 * used for this job for map tasks, 1117 * <code>false</code> otherwise. 1118 */ 1119 public boolean getMapSpeculativeExecution() { 1120 return getBoolean(JobContext.MAP_SPECULATIVE, true); 1121 } 1122 1123 /** 1124 * Turn speculative execution on or off for this job for map tasks. 1125 * 1126 * @param speculativeExecution <code>true</code> if speculative execution 1127 * should be turned on for map tasks, 1128 * else <code>false</code>. 1129 */ 1130 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1131 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution); 1132 } 1133 1134 /** 1135 * Should speculative execution be used for this job for reduce tasks? 1136 * Defaults to <code>true</code>. 1137 * 1138 * @return <code>true</code> if speculative execution be used 1139 * for reduce tasks for this job, 1140 * <code>false</code> otherwise. 1141 */ 1142 public boolean getReduceSpeculativeExecution() { 1143 return getBoolean(JobContext.REDUCE_SPECULATIVE, true); 1144 } 1145 1146 /** 1147 * Turn speculative execution on or off for this job for reduce tasks. 1148 * 1149 * @param speculativeExecution <code>true</code> if speculative execution 1150 * should be turned on for reduce tasks, 1151 * else <code>false</code>. 1152 */ 1153 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1154 setBoolean(JobContext.REDUCE_SPECULATIVE, 1155 speculativeExecution); 1156 } 1157 1158 /** 1159 * Get configured the number of reduce tasks for this job. 1160 * Defaults to <code>1</code>. 1161 * 1162 * @return the number of reduce tasks for this job. 1163 */ 1164 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); } 1165 1166 /** 1167 * Set the number of map tasks for this job. 1168 * 1169 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 1170 * number of spawned map tasks depends on the number of {@link InputSplit}s 1171 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}. 1172 * 1173 * A custom {@link InputFormat} is typically used to accurately control 1174 * the number of map tasks for the job.</p> 1175 * 1176 * <h4 id="NoOfMaps">How many maps?</h4> 1177 * 1178 * <p>The number of maps is usually driven by the total size of the inputs 1179 * i.e. total number of blocks of the input files.</p> 1180 * 1181 * <p>The right level of parallelism for maps seems to be around 10-100 maps 1182 * per-node, although it has been set up to 300 or so for very cpu-light map 1183 * tasks. Task setup takes awhile, so it is best if the maps take at least a 1184 * minute to execute.</p> 1185 * 1186 * <p>The default behavior of file-based {@link InputFormat}s is to split the 1187 * input into <i>logical</i> {@link InputSplit}s based on the total size, in 1188 * bytes, of input files. However, the {@link FileSystem} blocksize of the 1189 * input files is treated as an upper bound for input splits. A lower bound 1190 * on the split size can be set via 1191 * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize"> 1192 * mapreduce.input.fileinputformat.split.minsize</a>.</p> 1193 * 1194 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 1195 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 1196 * used to set it even higher.</p> 1197 * 1198 * @param n the number of map tasks for this job. 1199 * @see InputFormat#getSplits(JobConf, int) 1200 * @see FileInputFormat 1201 * @see FileSystem#getDefaultBlockSize() 1202 * @see FileStatus#getBlockSize() 1203 */ 1204 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); } 1205 1206 /** 1207 * Get configured the number of reduce tasks for this job. Defaults to 1208 * <code>1</code>. 1209 * 1210 * @return the number of reduce tasks for this job. 1211 */ 1212 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); } 1213 1214 /** 1215 * Set the requisite number of reduce tasks for this job. 1216 * 1217 * <h4 id="NoOfReduces">How many reduces?</h4> 1218 * 1219 * <p>The right number of reduces seems to be <code>0.95</code> or 1220 * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> * 1221 * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum"> 1222 * mapreduce.tasktracker.reduce.tasks.maximum</a>). 1223 * </p> 1224 * 1225 * <p>With <code>0.95</code> all of the reduces can launch immediately and 1226 * start transfering map outputs as the maps finish. With <code>1.75</code> 1227 * the faster nodes will finish their first round of reduces and launch a 1228 * second wave of reduces doing a much better job of load balancing.</p> 1229 * 1230 * <p>Increasing the number of reduces increases the framework overhead, but 1231 * increases load balancing and lowers the cost of failures.</p> 1232 * 1233 * <p>The scaling factors above are slightly less than whole numbers to 1234 * reserve a few reduce slots in the framework for speculative-tasks, failures 1235 * etc.</p> 1236 * 1237 * <h4 id="ReducerNone">Reducer NONE</h4> 1238 * 1239 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p> 1240 * 1241 * <p>In this case the output of the map-tasks directly go to distributed 1242 * file-system, to the path set by 1243 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 1244 * framework doesn't sort the map-outputs before writing it out to HDFS.</p> 1245 * 1246 * @param n the number of reduce tasks for this job. 1247 */ 1248 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); } 1249 1250 /** 1251 * Get the configured number of maximum attempts that will be made to run a 1252 * map task, as specified by the <code>mapreduce.map.maxattempts</code> 1253 * property. If this property is not already set, the default is 4 attempts. 1254 * 1255 * @return the max number of attempts per map task. 1256 */ 1257 public int getMaxMapAttempts() { 1258 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4); 1259 } 1260 1261 /** 1262 * Expert: Set the number of maximum attempts that will be made to run a 1263 * map task. 1264 * 1265 * @param n the number of attempts per map task. 1266 */ 1267 public void setMaxMapAttempts(int n) { 1268 setInt(JobContext.MAP_MAX_ATTEMPTS, n); 1269 } 1270 1271 /** 1272 * Get the configured number of maximum attempts that will be made to run a 1273 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code> 1274 * property. If this property is not already set, the default is 4 attempts. 1275 * 1276 * @return the max number of attempts per reduce task. 1277 */ 1278 public int getMaxReduceAttempts() { 1279 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4); 1280 } 1281 /** 1282 * Expert: Set the number of maximum attempts that will be made to run a 1283 * reduce task. 1284 * 1285 * @param n the number of attempts per reduce task. 1286 */ 1287 public void setMaxReduceAttempts(int n) { 1288 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n); 1289 } 1290 1291 /** 1292 * Get the user-specified job name. This is only used to identify the 1293 * job to the user. 1294 * 1295 * @return the job's name, defaulting to "". 1296 */ 1297 public String getJobName() { 1298 return get(JobContext.JOB_NAME, ""); 1299 } 1300 1301 /** 1302 * Set the user-specified job name. 1303 * 1304 * @param name the job's new name. 1305 */ 1306 public void setJobName(String name) { 1307 set(JobContext.JOB_NAME, name); 1308 } 1309 1310 /** 1311 * Get the user-specified session identifier. The default is the empty string. 1312 * 1313 * The session identifier is used to tag metric data that is reported to some 1314 * performance metrics system via the org.apache.hadoop.metrics API. The 1315 * session identifier is intended, in particular, for use by Hadoop-On-Demand 1316 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 1317 * HOD will set the session identifier by modifying the mapred-site.xml file 1318 * before starting the cluster. 1319 * 1320 * When not running under HOD, this identifer is expected to remain set to 1321 * the empty string. 1322 * 1323 * @return the session identifier, defaulting to "". 1324 */ 1325 @Deprecated 1326 public String getSessionId() { 1327 return get("session.id", ""); 1328 } 1329 1330 /** 1331 * Set the user-specified session identifier. 1332 * 1333 * @param sessionId the new session id. 1334 */ 1335 @Deprecated 1336 public void setSessionId(String sessionId) { 1337 set("session.id", sessionId); 1338 } 1339 1340 /** 1341 * Set the maximum no. of failures of a given job per tasktracker. 1342 * If the no. of task failures exceeds <code>noFailures</code>, the 1343 * tasktracker is <i>blacklisted</i> for this job. 1344 * 1345 * @param noFailures maximum no. of failures of a given job per tasktracker. 1346 */ 1347 public void setMaxTaskFailuresPerTracker(int noFailures) { 1348 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures); 1349 } 1350 1351 /** 1352 * Expert: Get the maximum no. of failures of a given job per tasktracker. 1353 * If the no. of task failures exceeds this, the tasktracker is 1354 * <i>blacklisted</i> for this job. 1355 * 1356 * @return the maximum no. of failures of a given job per tasktracker. 1357 */ 1358 public int getMaxTaskFailuresPerTracker() { 1359 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3); 1360 } 1361 1362 /** 1363 * Get the maximum percentage of map tasks that can fail without 1364 * the job being aborted. 1365 * 1366 * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 1367 * attempts before being declared as <i>failed</i>. 1368 * 1369 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in 1370 * the job being declared as {@link JobStatus#FAILED}. 1371 * 1372 * @return the maximum percentage of map tasks that can fail without 1373 * the job being aborted. 1374 */ 1375 public int getMaxMapTaskFailuresPercent() { 1376 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0); 1377 } 1378 1379 /** 1380 * Expert: Set the maximum percentage of map tasks that can fail without the 1381 * job being aborted. 1382 * 1383 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 1384 * before being declared as <i>failed</i>. 1385 * 1386 * @param percent the maximum percentage of map tasks that can fail without 1387 * the job being aborted. 1388 */ 1389 public void setMaxMapTaskFailuresPercent(int percent) { 1390 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent); 1391 } 1392 1393 /** 1394 * Get the maximum percentage of reduce tasks that can fail without 1395 * the job being aborted. 1396 * 1397 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1398 * attempts before being declared as <i>failed</i>. 1399 * 1400 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 1401 * in the job being declared as {@link JobStatus#FAILED}. 1402 * 1403 * @return the maximum percentage of reduce tasks that can fail without 1404 * the job being aborted. 1405 */ 1406 public int getMaxReduceTaskFailuresPercent() { 1407 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0); 1408 } 1409 1410 /** 1411 * Set the maximum percentage of reduce tasks that can fail without the job 1412 * being aborted. 1413 * 1414 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1415 * attempts before being declared as <i>failed</i>. 1416 * 1417 * @param percent the maximum percentage of reduce tasks that can fail without 1418 * the job being aborted. 1419 */ 1420 public void setMaxReduceTaskFailuresPercent(int percent) { 1421 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent); 1422 } 1423 1424 /** 1425 * Set {@link JobPriority} for this job. 1426 * 1427 * @param prio the {@link JobPriority} for this job. 1428 */ 1429 public void setJobPriority(JobPriority prio) { 1430 set(JobContext.PRIORITY, prio.toString()); 1431 } 1432 1433 /** 1434 * Get the {@link JobPriority} for this job. 1435 * 1436 * @return the {@link JobPriority} for this job. 1437 */ 1438 public JobPriority getJobPriority() { 1439 String prio = get(JobContext.PRIORITY); 1440 if(prio == null) { 1441 return JobPriority.NORMAL; 1442 } 1443 1444 return JobPriority.valueOf(prio); 1445 } 1446 1447 /** 1448 * Set JobSubmitHostName for this job. 1449 * 1450 * @param hostname the JobSubmitHostName for this job. 1451 */ 1452 void setJobSubmitHostName(String hostname) { 1453 set(MRJobConfig.JOB_SUBMITHOST, hostname); 1454 } 1455 1456 /** 1457 * Get the JobSubmitHostName for this job. 1458 * 1459 * @return the JobSubmitHostName for this job. 1460 */ 1461 String getJobSubmitHostName() { 1462 String hostname = get(MRJobConfig.JOB_SUBMITHOST); 1463 1464 return hostname; 1465 } 1466 1467 /** 1468 * Set JobSubmitHostAddress for this job. 1469 * 1470 * @param hostadd the JobSubmitHostAddress for this job. 1471 */ 1472 void setJobSubmitHostAddress(String hostadd) { 1473 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd); 1474 } 1475 1476 /** 1477 * Get JobSubmitHostAddress for this job. 1478 * 1479 * @return JobSubmitHostAddress for this job. 1480 */ 1481 String getJobSubmitHostAddress() { 1482 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR); 1483 1484 return hostadd; 1485 } 1486 1487 /** 1488 * Get whether the task profiling is enabled. 1489 * @return true if some tasks will be profiled 1490 */ 1491 public boolean getProfileEnabled() { 1492 return getBoolean(JobContext.TASK_PROFILE, false); 1493 } 1494 1495 /** 1496 * Set whether the system should collect profiler information for some of 1497 * the tasks in this job? The information is stored in the user log 1498 * directory. 1499 * @param newValue true means it should be gathered 1500 */ 1501 public void setProfileEnabled(boolean newValue) { 1502 setBoolean(JobContext.TASK_PROFILE, newValue); 1503 } 1504 1505 /** 1506 * Get the profiler configuration arguments. 1507 * 1508 * The default value for this property is 1509 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s" 1510 * 1511 * @return the parameters to pass to the task child to configure profiling 1512 */ 1513 public String getProfileParams() { 1514 return get(JobContext.TASK_PROFILE_PARAMS, 1515 "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," + 1516 "verbose=n,file=%s"); 1517 } 1518 1519 /** 1520 * Set the profiler configuration arguments. If the string contains a '%s' it 1521 * will be replaced with the name of the profiling output file when the task 1522 * runs. 1523 * 1524 * This value is passed to the task child JVM on the command line. 1525 * 1526 * @param value the configuration string 1527 */ 1528 public void setProfileParams(String value) { 1529 set(JobContext.TASK_PROFILE_PARAMS, value); 1530 } 1531 1532 /** 1533 * Get the range of maps or reduces to profile. 1534 * @param isMap is the task a map? 1535 * @return the task ranges 1536 */ 1537 public IntegerRanges getProfileTaskRange(boolean isMap) { 1538 return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 1539 JobContext.NUM_REDUCE_PROFILES), "0-2"); 1540 } 1541 1542 /** 1543 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1544 * must also be called. 1545 * @param newValue a set of integer ranges of the map ids 1546 */ 1547 public void setProfileTaskRange(boolean isMap, String newValue) { 1548 // parse the value to make sure it is legal 1549 new Configuration.IntegerRanges(newValue); 1550 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 1551 newValue); 1552 } 1553 1554 /** 1555 * Set the debug script to run when the map tasks fail. 1556 * 1557 * <p>The debug script can aid debugging of failed map tasks. The script is 1558 * given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1559 * 1560 * <p>The debug command, run on the node where the map failed, is:</p> 1561 * <p><pre><blockquote> 1562 * $script $stdout $stderr $syslog $jobconf. 1563 * </blockquote></pre></p> 1564 * 1565 * <p> The script file is distributed through {@link DistributedCache} 1566 * APIs. The script needs to be symlinked. </p> 1567 * 1568 * <p>Here is an example on how to submit a script 1569 * <p><blockquote><pre> 1570 * job.setMapDebugScript("./myscript"); 1571 * DistributedCache.createSymlink(job); 1572 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1573 * </pre></blockquote></p> 1574 * 1575 * @param mDbgScript the script name 1576 */ 1577 public void setMapDebugScript(String mDbgScript) { 1578 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript); 1579 } 1580 1581 /** 1582 * Get the map task's debug script. 1583 * 1584 * @return the debug Script for the mapred job for failed map tasks. 1585 * @see #setMapDebugScript(String) 1586 */ 1587 public String getMapDebugScript() { 1588 return get(JobContext.MAP_DEBUG_SCRIPT); 1589 } 1590 1591 /** 1592 * Set the debug script to run when the reduce tasks fail. 1593 * 1594 * <p>The debug script can aid debugging of failed reduce tasks. The script 1595 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1596 * 1597 * <p>The debug command, run on the node where the map failed, is:</p> 1598 * <p><pre><blockquote> 1599 * $script $stdout $stderr $syslog $jobconf. 1600 * </blockquote></pre></p> 1601 * 1602 * <p> The script file is distributed through {@link DistributedCache} 1603 * APIs. The script file needs to be symlinked </p> 1604 * 1605 * <p>Here is an example on how to submit a script 1606 * <p><blockquote><pre> 1607 * job.setReduceDebugScript("./myscript"); 1608 * DistributedCache.createSymlink(job); 1609 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1610 * </pre></blockquote></p> 1611 * 1612 * @param rDbgScript the script name 1613 */ 1614 public void setReduceDebugScript(String rDbgScript) { 1615 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript); 1616 } 1617 1618 /** 1619 * Get the reduce task's debug Script 1620 * 1621 * @return the debug script for the mapred job for failed reduce tasks. 1622 * @see #setReduceDebugScript(String) 1623 */ 1624 public String getReduceDebugScript() { 1625 return get(JobContext.REDUCE_DEBUG_SCRIPT); 1626 } 1627 1628 /** 1629 * Get the uri to be invoked in-order to send a notification after the job 1630 * has completed (success/failure). 1631 * 1632 * @return the job end notification uri, <code>null</code> if it hasn't 1633 * been set. 1634 * @see #setJobEndNotificationURI(String) 1635 */ 1636 public String getJobEndNotificationURI() { 1637 return get(JobContext.MR_JOB_END_NOTIFICATION_URL); 1638 } 1639 1640 /** 1641 * Set the uri to be invoked in-order to send a notification after the job 1642 * has completed (success/failure). 1643 * 1644 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 1645 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 1646 * identifier and completion-status respectively.</p> 1647 * 1648 * <p>This is typically used by application-writers to implement chaining of 1649 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p> 1650 * 1651 * @param uri the job end notification uri 1652 * @see JobStatus 1653 * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html# 1654 * JobCompletionAndChaining">Job Completion and Chaining</a> 1655 */ 1656 public void setJobEndNotificationURI(String uri) { 1657 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri); 1658 } 1659 1660 /** 1661 * Get job-specific shared directory for use as scratch space 1662 * 1663 * <p> 1664 * When a job starts, a shared directory is created at location 1665 * <code> 1666 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>. 1667 * This directory is exposed to the users through 1668 * <code>mapreduce.job.local.dir </code>. 1669 * So, the tasks can use this space 1670 * as scratch space and share files among them. </p> 1671 * This value is available as System property also. 1672 * 1673 * @return The localized job specific shared directory 1674 */ 1675 public String getJobLocalDir() { 1676 return get(JobContext.JOB_LOCAL_DIR); 1677 } 1678 1679 /** 1680 * Get memory required to run a map task of the job, in MB. 1681 * 1682 * If a value is specified in the configuration, it is returned. 1683 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}. 1684 * <p/> 1685 * For backward compatibility, if the job configuration sets the 1686 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1687 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1688 * after converting it from bytes to MB. 1689 * @return memory required to run a map task of the job, in MB, 1690 * or {@link #DISABLED_MEMORY_LIMIT} if unset. 1691 */ 1692 public long getMemoryForMapTask() { 1693 long value = getDeprecatedMemoryValue(); 1694 if (value == DISABLED_MEMORY_LIMIT) { 1695 value = normalizeMemoryConfigValue( 1696 getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, 1697 DISABLED_MEMORY_LIMIT)); 1698 } 1699 return value; 1700 } 1701 1702 public void setMemoryForMapTask(long mem) { 1703 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1704 } 1705 1706 /** 1707 * Get memory required to run a reduce task of the job, in MB. 1708 * 1709 * If a value is specified in the configuration, it is returned. 1710 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}. 1711 * <p/> 1712 * For backward compatibility, if the job configuration sets the 1713 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1714 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1715 * after converting it from bytes to MB. 1716 * @return memory required to run a reduce task of the job, in MB, 1717 * or {@link #DISABLED_MEMORY_LIMIT} if unset. 1718 */ 1719 public long getMemoryForReduceTask() { 1720 long value = getDeprecatedMemoryValue(); 1721 if (value == DISABLED_MEMORY_LIMIT) { 1722 value = normalizeMemoryConfigValue( 1723 getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, 1724 DISABLED_MEMORY_LIMIT)); 1725 } 1726 return value; 1727 } 1728 1729 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY, 1730 // converted into MBs. 1731 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative 1732 // value. 1733 private long getDeprecatedMemoryValue() { 1734 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 1735 DISABLED_MEMORY_LIMIT); 1736 oldValue = normalizeMemoryConfigValue(oldValue); 1737 if (oldValue != DISABLED_MEMORY_LIMIT) { 1738 oldValue /= (1024*1024); 1739 } 1740 return oldValue; 1741 } 1742 1743 public void setMemoryForReduceTask(long mem) { 1744 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1745 } 1746 1747 /** 1748 * Return the name of the queue to which this job is submitted. 1749 * Defaults to 'default'. 1750 * 1751 * @return name of the queue 1752 */ 1753 public String getQueueName() { 1754 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME); 1755 } 1756 1757 /** 1758 * Set the name of the queue to which this job should be submitted. 1759 * 1760 * @param queueName Name of the queue 1761 */ 1762 public void setQueueName(String queueName) { 1763 set(JobContext.QUEUE_NAME, queueName); 1764 } 1765 1766 /** 1767 * Normalize the negative values in configuration 1768 * 1769 * @param val 1770 * @return normalized value 1771 */ 1772 public static long normalizeMemoryConfigValue(long val) { 1773 if (val < 0) { 1774 val = DISABLED_MEMORY_LIMIT; 1775 } 1776 return val; 1777 } 1778 1779 /** 1780 * Compute the number of slots required to run a single map task-attempt 1781 * of this job. 1782 * @param slotSizePerMap cluster-wide value of the amount of memory required 1783 * to run a map-task 1784 * @return the number of slots required to run a single map task-attempt 1785 * 1 if memory parameters are disabled. 1786 */ 1787 int computeNumSlotsPerMap(long slotSizePerMap) { 1788 if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) || 1789 (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) { 1790 return 1; 1791 } 1792 return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap)); 1793 } 1794 1795 /** 1796 * Compute the number of slots required to run a single reduce task-attempt 1797 * of this job. 1798 * @param slotSizePerReduce cluster-wide value of the amount of memory 1799 * required to run a reduce-task 1800 * @return the number of slots required to run a single reduce task-attempt 1801 * 1 if memory parameters are disabled 1802 */ 1803 int computeNumSlotsPerReduce(long slotSizePerReduce) { 1804 if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) || 1805 (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) { 1806 return 1; 1807 } 1808 return 1809 (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce)); 1810 } 1811 1812 /** 1813 * Find a jar that contains a class of the same name, if any. 1814 * It will return a jar file, even if that is not the first thing 1815 * on the class path that has a class with the same name. 1816 * 1817 * @param my_class the class to find. 1818 * @return a jar file that contains the class, or null. 1819 * @throws IOException 1820 */ 1821 public static String findContainingJar(Class my_class) { 1822 ClassLoader loader = my_class.getClassLoader(); 1823 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; 1824 try { 1825 for(Enumeration itr = loader.getResources(class_file); 1826 itr.hasMoreElements();) { 1827 URL url = (URL) itr.nextElement(); 1828 if ("jar".equals(url.getProtocol())) { 1829 String toReturn = url.getPath(); 1830 if (toReturn.startsWith("file:")) { 1831 toReturn = toReturn.substring("file:".length()); 1832 } 1833 // URLDecoder is a misnamed class, since it actually decodes 1834 // x-www-form-urlencoded MIME type rather than actual 1835 // URL encoding (which the file path has). Therefore it would 1836 // decode +s to ' 's which is incorrect (spaces are actually 1837 // either unencoded or encoded as "%20"). Replace +s first, so 1838 // that they are kept sacred during the decoding process. 1839 toReturn = toReturn.replaceAll("\\+", "%2B"); 1840 toReturn = URLDecoder.decode(toReturn, "UTF-8"); 1841 return toReturn.replaceAll("!.*$", ""); 1842 } 1843 } 1844 } catch (IOException e) { 1845 throw new RuntimeException(e); 1846 } 1847 return null; 1848 } 1849 1850 1851 /** 1852 * Get the memory required to run a task of this job, in bytes. See 1853 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 1854 * <p/> 1855 * This method is deprecated. Now, different memory limits can be 1856 * set for map and reduce tasks of a job, in MB. 1857 * <p/> 1858 * For backward compatibility, if the job configuration sets the 1859 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1860 * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned. 1861 * Otherwise, this method will return the larger of the values returned by 1862 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()} 1863 * after converting them into bytes. 1864 * 1865 * @return Memory required to run a task of this job, in bytes, 1866 * or {@link #DISABLED_MEMORY_LIMIT}, if unset. 1867 * @see #setMaxVirtualMemoryForTask(long) 1868 * @deprecated Use {@link #getMemoryForMapTask()} and 1869 * {@link #getMemoryForReduceTask()} 1870 */ 1871 @Deprecated 1872 public long getMaxVirtualMemoryForTask() { 1873 LOG.warn( 1874 "getMaxVirtualMemoryForTask() is deprecated. " + 1875 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()"); 1876 1877 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT); 1878 value = normalizeMemoryConfigValue(value); 1879 if (value == DISABLED_MEMORY_LIMIT) { 1880 value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask()); 1881 value = normalizeMemoryConfigValue(value); 1882 if (value != DISABLED_MEMORY_LIMIT) { 1883 value *= 1024*1024; 1884 } 1885 } 1886 return value; 1887 } 1888 1889 /** 1890 * Set the maximum amount of memory any task of this job can use. See 1891 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 1892 * <p/> 1893 * mapred.task.maxvmem is split into 1894 * mapreduce.map.memory.mb 1895 * and mapreduce.map.memory.mb,mapred 1896 * each of the new key are set 1897 * as mapred.task.maxvmem / 1024 1898 * as new values are in MB 1899 * 1900 * @param vmem Maximum amount of virtual memory in bytes any task of this job 1901 * can use. 1902 * @see #getMaxVirtualMemoryForTask() 1903 * @deprecated 1904 * Use {@link #setMemoryForMapTask(long mem)} and 1905 * Use {@link #setMemoryForReduceTask(long mem)} 1906 */ 1907 @Deprecated 1908 public void setMaxVirtualMemoryForTask(long vmem) { 1909 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+ 1910 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()"); 1911 if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) { 1912 setMemoryForMapTask(DISABLED_MEMORY_LIMIT); 1913 setMemoryForReduceTask(DISABLED_MEMORY_LIMIT); 1914 } 1915 1916 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) { 1917 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb 1918 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb 1919 }else{ 1920 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem); 1921 } 1922 } 1923 1924 /** 1925 * @deprecated this variable is deprecated and nolonger in use. 1926 */ 1927 @Deprecated 1928 public long getMaxPhysicalMemoryForTask() { 1929 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated." 1930 + " Refer to the APIs getMemoryForMapTask() and" 1931 + " getMemoryForReduceTask() for details."); 1932 return -1; 1933 } 1934 1935 /* 1936 * @deprecated this 1937 */ 1938 @Deprecated 1939 public void setMaxPhysicalMemoryForTask(long mem) { 1940 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated." 1941 + " The value set is ignored. Refer to " 1942 + " setMemoryForMapTask() and setMemoryForReduceTask() for details."); 1943 } 1944 1945 static String deprecatedString(String key) { 1946 return "The variable " + key + " is no longer used."; 1947 } 1948 1949 private void checkAndWarnDeprecation() { 1950 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) { 1951 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) 1952 + " Instead use " + JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY 1953 + " and " + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY); 1954 } 1955 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) { 1956 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT)); 1957 } 1958 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) { 1959 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT)); 1960 } 1961 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) { 1962 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT)); 1963 } 1964 } 1965 1966 1967} 1968