@InterfaceAudience.Public @InterfaceStability.Stable public class SkipBadRecords extends Object
Hadoop provides an optional mode of execution in which the bad records are detected and skipped in further attempts.
This feature can be used when map/reduce tasks crashes deterministically on certain input. This happens due to bugs in the map/reduce function. The usual course would be to fix these bugs. But sometimes this is not possible; perhaps the bug is in third party libraries for which the source code is not available. Due to this, the task never reaches to completion even with multiple attempts and complete data for that task is lost.
With this feature, only a small portion of data is lost surrounding
the bad record, which may be acceptable for some user applications.
see setMapperMaxSkipRecords(Configuration, long)
The skipping mode gets kicked off after certain no of failures
see setAttemptsToStartSkipping(Configuration, int)
In the skipping mode, the map/reduce task maintains the record range which is getting processed at all times. Before giving the input to the map/reduce function, it sends this record range to the Task tracker. If task crashes, the Task tracker knows which one was the last reported range. On further attempts that range get skipped.
Modifier and Type | Field and Description |
---|---|
static String |
COUNTER_GROUP
Special counters which are written by the application and are
used by the framework for detecting bad records.
|
static String |
COUNTER_MAP_PROCESSED_RECORDS
Number of processed map records.
|
static String |
COUNTER_REDUCE_PROCESSED_GROUPS
Number of processed reduce groups.
|
Constructor and Description |
---|
SkipBadRecords() |
Modifier and Type | Method and Description |
---|---|
static int |
getAttemptsToStartSkipping(Configuration conf)
Get the number of Task attempts AFTER which skip mode
will be kicked off.
|
static boolean |
getAutoIncrMapperProcCount(Configuration conf)
Get the flag which if set to true,
COUNTER_MAP_PROCESSED_RECORDS is incremented
by MapRunner after invoking the map function. |
static boolean |
getAutoIncrReducerProcCount(Configuration conf)
Get the flag which if set to true,
COUNTER_REDUCE_PROCESSED_GROUPS is incremented
by framework after invoking the reduce function. |
static long |
getMapperMaxSkipRecords(Configuration conf)
Get the number of acceptable skip records surrounding the bad record PER
bad record in mapper.
|
static long |
getReducerMaxSkipGroups(Configuration conf)
Get the number of acceptable skip groups surrounding the bad group PER
bad group in reducer.
|
static Path |
getSkipOutputPath(Configuration conf)
Get the directory to which skipped records are written.
|
static void |
setAttemptsToStartSkipping(Configuration conf,
int attemptsToStartSkipping)
Set the number of Task attempts AFTER which skip mode
will be kicked off.
|
static void |
setAutoIncrMapperProcCount(Configuration conf,
boolean autoIncr)
Set the flag which if set to true,
COUNTER_MAP_PROCESSED_RECORDS is incremented
by MapRunner after invoking the map function. |
static void |
setAutoIncrReducerProcCount(Configuration conf,
boolean autoIncr)
Set the flag which if set to true,
COUNTER_REDUCE_PROCESSED_GROUPS is incremented
by framework after invoking the reduce function. |
static void |
setMapperMaxSkipRecords(Configuration conf,
long maxSkipRecs)
Set the number of acceptable skip records surrounding the bad record PER
bad record in mapper.
|
static void |
setReducerMaxSkipGroups(Configuration conf,
long maxSkipGrps)
Set the number of acceptable skip groups surrounding the bad group PER
bad group in reducer.
|
static void |
setSkipOutputPath(JobConf conf,
Path path)
Set the directory to which skipped records are written.
|
public static final String COUNTER_GROUP
public static final String COUNTER_MAP_PROCESSED_RECORDS
public static final String COUNTER_REDUCE_PROCESSED_GROUPS
public static int getAttemptsToStartSkipping(Configuration conf)
conf
- the configurationpublic static void setAttemptsToStartSkipping(Configuration conf, int attemptsToStartSkipping)
conf
- the configurationattemptsToStartSkipping
- no of task attemptspublic static boolean getAutoIncrMapperProcCount(Configuration conf)
COUNTER_MAP_PROCESSED_RECORDS
is incremented
by MapRunner after invoking the map function. This value must be set to
false for applications which process the records asynchronously
or buffer the input records. For example streaming.
In such cases applications should increment this counter on their own.
Default value is true.conf
- the configurationtrue
if auto increment
COUNTER_MAP_PROCESSED_RECORDS
.
false
otherwise.public static void setAutoIncrMapperProcCount(Configuration conf, boolean autoIncr)
COUNTER_MAP_PROCESSED_RECORDS
is incremented
by MapRunner after invoking the map function. This value must be set to
false for applications which process the records asynchronously
or buffer the input records. For example streaming.
In such cases applications should increment this counter on their own.
Default value is true.conf
- the configurationautoIncr
- whether to auto increment
COUNTER_MAP_PROCESSED_RECORDS
.public static boolean getAutoIncrReducerProcCount(Configuration conf)
COUNTER_REDUCE_PROCESSED_GROUPS
is incremented
by framework after invoking the reduce function. This value must be set to
false for applications which process the records asynchronously
or buffer the input records. For example streaming.
In such cases applications should increment this counter on their own.
Default value is true.conf
- the configurationtrue
if auto increment
COUNTER_REDUCE_PROCESSED_GROUPS
.
false
otherwise.public static void setAutoIncrReducerProcCount(Configuration conf, boolean autoIncr)
COUNTER_REDUCE_PROCESSED_GROUPS
is incremented
by framework after invoking the reduce function. This value must be set to
false for applications which process the records asynchronously
or buffer the input records. For example streaming.
In such cases applications should increment this counter on their own.
Default value is true.conf
- the configurationautoIncr
- whether to auto increment
COUNTER_REDUCE_PROCESSED_GROUPS
.public static Path getSkipOutputPath(Configuration conf)
conf
- the configuration.public static void setSkipOutputPath(JobConf conf, Path path)
conf
- the configuration.path
- skip output directory pathpublic static long getMapperMaxSkipRecords(Configuration conf)
conf
- the configurationpublic static void setMapperMaxSkipRecords(Configuration conf, long maxSkipRecs)
conf
- the configurationmaxSkipRecs
- acceptable skip records.public static long getReducerMaxSkipGroups(Configuration conf)
conf
- the configurationpublic static void setReducerMaxSkipGroups(Configuration conf, long maxSkipGrps)
conf
- the configurationmaxSkipGrps
- acceptable skip groups.Copyright © 2023 Apache Software Foundation. All rights reserved.