org.apache.hadoop.mapred.lib
Class ChainReducer

java.lang.Object
  extended by org.apache.hadoop.mapred.lib.ChainReducer
All Implemented Interfaces:
Closeable, JobConfigurable, Reducer

public class ChainReducer
extends Object
implements Reducer

The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.

For each record output by the Reducer, the Mapper classes are invoked in a chained (or piped) fashion, the output of the first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

The key functionality of this feature is that the Mappers in the chain do not need to be aware that they are executed after the Reducer or in a chain. This enables having reusable specialized Mappers that can be combined to perform composite operations within a single task.

Special care has to be taken when creating chains that the key/values output by a Mapper are valid for the following Mapper in the chain. It is assumed all Mappers and the Reduce in the chain use maching output and input key and value classes as no conversion is done by the chaining code.

Using the ChainMapper and the ChainReducer classes is possible to compose Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]. And immediate benefit of this pattern is a dramatic reduction in disk IO.

IMPORTANT: There is no need to specify the output key/value classes for the ChainReducer, this is done by the setReducer or the addMapper for the last element in the chain.

ChainReducer usage pattern:

 ...
 conf.setJobName("chain");
 conf.setInputFormat(TextInputFormat.class);
 conf.setOutputFormat(TextOutputFormat.class);
 

JobConf mapAConf = new JobConf(false); ... ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class, Text.class, Text.class, true, mapAConf);

JobConf mapBConf = new JobConf(false); ... ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class, LongWritable.class, Text.class, false, mapBConf);

JobConf reduceConf = new JobConf(false); ... ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class, Text.class, Text.class, true, reduceConf);

ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class, LongWritable.class, Text.class, false, null);

ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class, LongWritable.class, LongWritable.class, true, null);

FileInputFormat.setInputPaths(conf, inDir); FileOutputFormat.setOutputPath(conf, outDir); ...

JobClient jc = new JobClient(conf); RunningJob job = jc.submitJob(conf); ...


Constructor Summary
ChainReducer()
          Constructor.
 
Method Summary
static
<K1,V1,K2,V2>
void
addMapper(JobConf job, Class<? extends Mapper<K1,V1,K2,V2>> klass, Class<? extends K1> inputKeyClass, Class<? extends V1> inputValueClass, Class<? extends K2> outputKeyClass, Class<? extends V2> outputValueClass, boolean byValue, JobConf mapperConf)
          Adds a Mapper class to the chain job's JobConf.
 void close()
          Closes the ChainReducer, the Reducer and all the Mappers in the chain.
 void configure(JobConf job)
          Configures the ChainReducer, the Reducer and all the Mappers in the chain.
 void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter)
          Chains the reduce(...) method of the Reducer with the map(...) methods of the Mappers in the chain.
static
<K1,V1,K2,V2>
void
setReducer(JobConf job, Class<? extends Reducer<K1,V1,K2,V2>> klass, Class<? extends K1> inputKeyClass, Class<? extends V1> inputValueClass, Class<? extends K2> outputKeyClass, Class<? extends V2> outputValueClass, boolean byValue, JobConf reducerConf)
          Sets the Reducer class to the chain job's JobConf.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

ChainReducer

public ChainReducer()
Constructor.

Method Detail

setReducer

public static <K1,V1,K2,V2> void setReducer(JobConf job,
                                            Class<? extends Reducer<K1,V1,K2,V2>> klass,
                                            Class<? extends K1> inputKeyClass,
                                            Class<? extends V1> inputValueClass,
                                            Class<? extends K2> outputKeyClass,
                                            Class<? extends V2> outputValueClass,
                                            boolean byValue,
                                            JobConf reducerConf)
Sets the Reducer class to the chain job's JobConf.

It has to be specified how key and values are passed from one element of the chain to the next, by value or by reference. If a Reducer leverages the assumed semantics that the key and values are not modified by the collector 'by value' must be used. If the Reducer does not expect this semantics, as an optimization to avoid serialization and deserialization 'by reference' can be used.

For the added Reducer the configuration given for it, reducerConf, have precedence over the job's JobConf. This precedence is in effect when the task is running.

IMPORTANT: There is no need to specify the output key/value classes for the ChainReducer, this is done by the setReducer or the addMapper for the last element in the chain.

Parameters:
job - job's JobConf to add the Reducer class.
klass - the Reducer class to add.
inputKeyClass - reducer input key class.
inputValueClass - reducer input value class.
outputKeyClass - reducer output key class.
outputValueClass - reducer output value class.
byValue - indicates if key/values should be passed by value to the next Mapper in the chain, if any.
reducerConf - a JobConf with the configuration for the Reducer class. It is recommended to use a JobConf without default values using the JobConf(boolean loadDefaults) constructor with FALSE.

addMapper

public static <K1,V1,K2,V2> void addMapper(JobConf job,
                                           Class<? extends Mapper<K1,V1,K2,V2>> klass,
                                           Class<? extends K1> inputKeyClass,
                                           Class<? extends V1> inputValueClass,
                                           Class<? extends K2> outputKeyClass,
                                           Class<? extends V2> outputValueClass,
                                           boolean byValue,
                                           JobConf mapperConf)
Adds a Mapper class to the chain job's JobConf.

It has to be specified how key and values are passed from one element of the chain to the next, by value or by reference. If a Mapper leverages the assumed semantics that the key and values are not modified by the collector 'by value' must be used. If the Mapper does not expect this semantics, as an optimization to avoid serialization and deserialization 'by reference' can be used.

For the added Mapper the configuration given for it, mapperConf, have precedence over the job's JobConf. This precedence is in effect when the task is running.

IMPORTANT: There is no need to specify the output key/value classes for the ChainMapper, this is done by the addMapper for the last mapper in the chain .

Parameters:
job - chain job's JobConf to add the Mapper class.
klass - the Mapper class to add.
inputKeyClass - mapper input key class.
inputValueClass - mapper input value class.
outputKeyClass - mapper output key class.
outputValueClass - mapper output value class.
byValue - indicates if key/values should be passed by value to the next Mapper in the chain, if any.
mapperConf - a JobConf with the configuration for the Mapper class. It is recommended to use a JobConf without default values using the JobConf(boolean loadDefaults) constructor with FALSE.

configure

public void configure(JobConf job)
Configures the ChainReducer, the Reducer and all the Mappers in the chain.

If this method is overriden super.configure(...) should be invoked at the beginning of the overwriter method.

Specified by:
configure in interface JobConfigurable
Parameters:
job - the configuration

reduce

public void reduce(Object key,
                   Iterator values,
                   OutputCollector output,
                   Reporter reporter)
            throws IOException
Chains the reduce(...) method of the Reducer with the map(...) methods of the Mappers in the chain.

Specified by:
reduce in interface Reducer
Parameters:
key - the key.
values - the list of values to reduce.
output - to collect keys and combined values.
reporter - facility to report progress.
Throws:
IOException

close

public void close()
           throws IOException
Closes the ChainReducer, the Reducer and all the Mappers in the chain.

If this method is overriden super.close() should be invoked at the end of the overwriter method.

Specified by:
close in interface Closeable
Throws:
IOException


Copyright © 2009 The Apache Software Foundation