org.apache.hadoop.mapred
Class SkipBadRecords

java.lang.Object
  extended by org.apache.hadoop.mapred.SkipBadRecords

@InterfaceAudience.Public
@InterfaceStability.Stable
public class SkipBadRecords
extends Object

Utility class for skip bad records functionality. It contains various settings related to skipping of bad records.

Hadoop provides an optional mode of execution in which the bad records are detected and skipped in further attempts.

This feature can be used when map/reduce tasks crashes deterministically on certain input. This happens due to bugs in the map/reduce function. The usual course would be to fix these bugs. But sometimes this is not possible; perhaps the bug is in third party libraries for which the source code is not available. Due to this, the task never reaches to completion even with multiple attempts and complete data for that task is lost.

With this feature, only a small portion of data is lost surrounding the bad record, which may be acceptable for some user applications. see setMapperMaxSkipRecords(Configuration, long)

The skipping mode gets kicked off after certain no of failures see setAttemptsToStartSkipping(Configuration, int)

In the skipping mode, the map/reduce task maintains the record range which is getting processed at all times. Before giving the input to the map/reduce function, it sends this record range to the Task tracker. If task crashes, the Task tracker knows which one was the last reported range. On further attempts that range get skipped.


Field Summary
static String COUNTER_GROUP
          Special counters which are written by the application and are used by the framework for detecting bad records.
static String COUNTER_MAP_PROCESSED_RECORDS
          Number of processed map records.
static String COUNTER_REDUCE_PROCESSED_GROUPS
          Number of processed reduce groups.
 
Constructor Summary
SkipBadRecords()
           
 
Method Summary
static int getAttemptsToStartSkipping(Configuration conf)
          Get the number of Task attempts AFTER which skip mode will be kicked off.
static boolean getAutoIncrMapperProcCount(Configuration conf)
          Get the flag which if set to true, COUNTER_MAP_PROCESSED_RECORDS is incremented by MapRunner after invoking the map function.
static boolean getAutoIncrReducerProcCount(Configuration conf)
          Get the flag which if set to true, COUNTER_REDUCE_PROCESSED_GROUPS is incremented by framework after invoking the reduce function.
static long getMapperMaxSkipRecords(Configuration conf)
          Get the number of acceptable skip records surrounding the bad record PER bad record in mapper.
static long getReducerMaxSkipGroups(Configuration conf)
          Get the number of acceptable skip groups surrounding the bad group PER bad group in reducer.
static Path getSkipOutputPath(Configuration conf)
          Get the directory to which skipped records are written.
static void setAttemptsToStartSkipping(Configuration conf, int attemptsToStartSkipping)
          Set the number of Task attempts AFTER which skip mode will be kicked off.
static void setAutoIncrMapperProcCount(Configuration conf, boolean autoIncr)
          Set the flag which if set to true, COUNTER_MAP_PROCESSED_RECORDS is incremented by MapRunner after invoking the map function.
static void setAutoIncrReducerProcCount(Configuration conf, boolean autoIncr)
          Set the flag which if set to true, COUNTER_REDUCE_PROCESSED_GROUPS is incremented by framework after invoking the reduce function.
static void setMapperMaxSkipRecords(Configuration conf, long maxSkipRecs)
          Set the number of acceptable skip records surrounding the bad record PER bad record in mapper.
static void setReducerMaxSkipGroups(Configuration conf, long maxSkipGrps)
          Set the number of acceptable skip groups surrounding the bad group PER bad group in reducer.
static void setSkipOutputPath(JobConf conf, Path path)
          Set the directory to which skipped records are written.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

COUNTER_GROUP

public static final String COUNTER_GROUP
Special counters which are written by the application and are used by the framework for detecting bad records. For detecting bad records these counters must be incremented by the application.

See Also:
Constant Field Values

COUNTER_MAP_PROCESSED_RECORDS

public static final String COUNTER_MAP_PROCESSED_RECORDS
Number of processed map records.

See Also:
getAutoIncrMapperProcCount(Configuration), Constant Field Values

COUNTER_REDUCE_PROCESSED_GROUPS

public static final String COUNTER_REDUCE_PROCESSED_GROUPS
Number of processed reduce groups.

See Also:
getAutoIncrReducerProcCount(Configuration), Constant Field Values
Constructor Detail

SkipBadRecords

public SkipBadRecords()
Method Detail

getAttemptsToStartSkipping

public static int getAttemptsToStartSkipping(Configuration conf)
Get the number of Task attempts AFTER which skip mode will be kicked off. When skip mode is kicked off, the tasks reports the range of records which it will process next to the TaskTracker. So that on failures, TT knows which ones are possibly the bad records. On further executions, those are skipped. Default value is 2.

Parameters:
conf - the configuration
Returns:
attemptsToStartSkipping no of task attempts

setAttemptsToStartSkipping

public static void setAttemptsToStartSkipping(Configuration conf,
                                              int attemptsToStartSkipping)
Set the number of Task attempts AFTER which skip mode will be kicked off. When skip mode is kicked off, the tasks reports the range of records which it will process next to the TaskTracker. So that on failures, TT knows which ones are possibly the bad records. On further executions, those are skipped. Default value is 2.

Parameters:
conf - the configuration
attemptsToStartSkipping - no of task attempts

getAutoIncrMapperProcCount

public static boolean getAutoIncrMapperProcCount(Configuration conf)
Get the flag which if set to true, COUNTER_MAP_PROCESSED_RECORDS is incremented by MapRunner after invoking the map function. This value must be set to false for applications which process the records asynchronously or buffer the input records. For example streaming. In such cases applications should increment this counter on their own. Default value is true.

Parameters:
conf - the configuration
Returns:
true if auto increment COUNTER_MAP_PROCESSED_RECORDS. false otherwise.

setAutoIncrMapperProcCount

public static void setAutoIncrMapperProcCount(Configuration conf,
                                              boolean autoIncr)
Set the flag which if set to true, COUNTER_MAP_PROCESSED_RECORDS is incremented by MapRunner after invoking the map function. This value must be set to false for applications which process the records asynchronously or buffer the input records. For example streaming. In such cases applications should increment this counter on their own. Default value is true.

Parameters:
conf - the configuration
autoIncr - whether to auto increment COUNTER_MAP_PROCESSED_RECORDS.

getAutoIncrReducerProcCount

public static boolean getAutoIncrReducerProcCount(Configuration conf)
Get the flag which if set to true, COUNTER_REDUCE_PROCESSED_GROUPS is incremented by framework after invoking the reduce function. This value must be set to false for applications which process the records asynchronously or buffer the input records. For example streaming. In such cases applications should increment this counter on their own. Default value is true.

Parameters:
conf - the configuration
Returns:
true if auto increment COUNTER_REDUCE_PROCESSED_GROUPS. false otherwise.

setAutoIncrReducerProcCount

public static void setAutoIncrReducerProcCount(Configuration conf,
                                               boolean autoIncr)
Set the flag which if set to true, COUNTER_REDUCE_PROCESSED_GROUPS is incremented by framework after invoking the reduce function. This value must be set to false for applications which process the records asynchronously or buffer the input records. For example streaming. In such cases applications should increment this counter on their own. Default value is true.

Parameters:
conf - the configuration
autoIncr - whether to auto increment COUNTER_REDUCE_PROCESSED_GROUPS.

getSkipOutputPath

public static Path getSkipOutputPath(Configuration conf)
Get the directory to which skipped records are written. By default it is the sub directory of the output _logs directory. User can stop writing skipped records by setting the value null.

Parameters:
conf - the configuration.
Returns:
path skip output directory. Null is returned if this is not set and output directory is also not set.

setSkipOutputPath

public static void setSkipOutputPath(JobConf conf,
                                     Path path)
Set the directory to which skipped records are written. By default it is the sub directory of the output _logs directory. User can stop writing skipped records by setting the value null.

Parameters:
conf - the configuration.
path - skip output directory path

getMapperMaxSkipRecords

public static long getMapperMaxSkipRecords(Configuration conf)
Get the number of acceptable skip records surrounding the bad record PER bad record in mapper. The number includes the bad record as well. To turn the feature of detection/skipping of bad records off, set the value to 0. The framework tries to narrow down the skipped range by retrying until this threshold is met OR all attempts get exhausted for this task. Set the value to Long.MAX_VALUE to indicate that framework need not try to narrow down. Whatever records(depends on application) get skipped are acceptable. Default value is 0.

Parameters:
conf - the configuration
Returns:
maxSkipRecs acceptable skip records.

setMapperMaxSkipRecords

public static void setMapperMaxSkipRecords(Configuration conf,
                                           long maxSkipRecs)
Set the number of acceptable skip records surrounding the bad record PER bad record in mapper. The number includes the bad record as well. To turn the feature of detection/skipping of bad records off, set the value to 0. The framework tries to narrow down the skipped range by retrying until this threshold is met OR all attempts get exhausted for this task. Set the value to Long.MAX_VALUE to indicate that framework need not try to narrow down. Whatever records(depends on application) get skipped are acceptable. Default value is 0.

Parameters:
conf - the configuration
maxSkipRecs - acceptable skip records.

getReducerMaxSkipGroups

public static long getReducerMaxSkipGroups(Configuration conf)
Get the number of acceptable skip groups surrounding the bad group PER bad group in reducer. The number includes the bad group as well. To turn the feature of detection/skipping of bad groups off, set the value to 0. The framework tries to narrow down the skipped range by retrying until this threshold is met OR all attempts get exhausted for this task. Set the value to Long.MAX_VALUE to indicate that framework need not try to narrow down. Whatever groups(depends on application) get skipped are acceptable. Default value is 0.

Parameters:
conf - the configuration
Returns:
maxSkipGrps acceptable skip groups.

setReducerMaxSkipGroups

public static void setReducerMaxSkipGroups(Configuration conf,
                                           long maxSkipGrps)
Set the number of acceptable skip groups surrounding the bad group PER bad group in reducer. The number includes the bad group as well. To turn the feature of detection/skipping of bad groups off, set the value to 0. The framework tries to narrow down the skipped range by retrying until this threshold is met OR all attempts get exhausted for this task. Set the value to Long.MAX_VALUE to indicate that framework need not try to narrow down. Whatever groups(depends on application) get skipped are acceptable. Default value is 0.

Parameters:
conf - the configuration
maxSkipGrps - acceptable skip groups.


Copyright © 2014 Apache Software Foundation. All Rights Reserved.