|
||||||||||
PREV PACKAGE NEXT PACKAGE | FRAMES NO FRAMES |
See:
Description
Interface Summary | |
---|---|
ValueAggregator | This interface defines the minimal protocol for value aggregators. |
ValueAggregatorDescriptor | This interface defines the contract a value aggregator descriptor must support. |
Class Summary | |
---|---|
DoubleValueSum | This class implements a value aggregator that sums up a sequence of double values. |
LongValueMax | This class implements a value aggregator that maintain the maximum of a sequence of long values. |
LongValueMin | This class implements a value aggregator that maintain the minimum of a sequence of long values. |
LongValueSum | This class implements a value aggregator that sums up a sequence of long values. |
StringValueMax | This class implements a value aggregator that maintain the biggest of a sequence of strings. |
StringValueMin | This class implements a value aggregator that maintain the smallest of a sequence of strings. |
UniqValueCount | This class implements a value aggregator that dedupes a sequence of objects. |
UserDefinedValueAggregatorDescriptor | This class implements a wrapper for a user defined value aggregator descriptor. |
ValueAggregatorBaseDescriptor | This class implements the common functionalities of the subclasses of ValueAggregatorDescriptor class. |
ValueAggregatorCombiner<K1 extends WritableComparable,V1 extends Writable> | This class implements the generic combiner of Aggregate. |
ValueAggregatorJob | This is the main class for creating a map/reduce job using Aggregate framework. |
ValueAggregatorJobBase<K1 extends WritableComparable,V1 extends Writable> | This abstract class implements some common functionalities of the the generic mapper, reducer and combiner classes of Aggregate. |
ValueAggregatorMapper<K1 extends WritableComparable,V1 extends Writable> | This class implements the generic mapper of Aggregate. |
ValueAggregatorReducer<K1 extends WritableComparable,V1 extends Writable> | This class implements the generic reducer of Aggregate. |
ValueHistogram | This class implements a value aggregator that computes the histogram of a sequence of strings. |
Classes for performing various counting and aggregations.
To call this function, the user needs to pass in arguments specifying the input directories, the output directory, the number of reducers, the input data format (textinputformat or sequencefileinputformat), and a file specifying user plugin class(es) to load by the mapper. A user plugin class is responsible for specifying what aggregators to use and what values are for which aggregators. A plugin class must implement the following interface:public static JobConf createValueAggregatorJob(String args[]) throws IOException;
Function generateKeyValPairs will generate aggregation key/value pairs for the input key/value pair. Each aggregation key encodes two pieces of information: the aggregation type and aggregation ID. The value is the value to be aggregated onto the aggregation ID according to the aggregation type. Here is a simple example user plugin class for counting the words in the input texts:public interface ValueAggregatorDescriptor { public ArrayList<Entry> generateKeyValPairs(Object key, Object value); public void configure(JobConfjob); }
In the above code, LONG_VALUE_SUM is a string denoting the aggregation type LongValueSum, which sums over long values. ONE denotes a string "1". Function generateEntry(LONG_VALUE_SUM, words[i], ONE) will inperpret the first argument as an aggregation type, the second as an aggregation ID, and the the third argumnent as the value to be aggregated. The output will look like: "LongValueSum:xxxx", where XXXX is the string value of words[i]. The value will be "1". The mapper will call generateKeyValPairs(Object key, Object val) for each input key/value pair to generate the desired aggregation id/value pairs. The down stream combiner/reducer will interpret these pairs as adding one to the aggregator XXXX. Class ValueAggregatorBaseDescriptor is a base class that user plugin classes can extend. Here is the XML fragment specifying the user plugin class:public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor { public ArrayList<Entry> generateKeyValPairs(Object key, Object val) { String words [] = val.toString().split(" |\t"); ArrayList<Entry> retv = new ArrayList<Entry>(); for (int i = 0; i < words.length; i++) { retv.add(generateEntry(LONG_VALUE_SUM, words[i], ONE)) } return retv; } public void configure(JobConf job) {} }
Class ValueAggregatorBaseDescriptor itself provides a default implementation for generateKeyValPairs:<property> <name>aggregator.descriptor.num</name> <value>1</value> </property> <property> <name>aggregator.descriptor.0</name> <value>UserDefined,org.apache.hadoop.mapred.lib.aggregate.examples.WordCountAggregatorDescriptor</value> </property>
Thus, if no user plugin class is specified, the default behavior of the map/reduce job is to count the number of records (lines) in the imput files. During runtime, the mapper will invoke the generateKeyValPairs function for each input key/value pair, and emit the generated key/value pairs:public ArrayList<Entry> generateKeyValPairs(Object key, Object val) { ArrayList<Entry> retv = new ArrayList<Entry>(); String countType = LONG_VALUE_SUM; String id = "record_count"; retv.add(generateEntry(countType, id, ONE)); return retv; }
The reducer will create an aggregator object for each key/value list pair, and perform the appropriate aggregation. At the end, it will emit the aggregator's results:public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { Iterator iter = this.aggregatorDescriptorList.iterator(); while (iter.hasNext()) { ValueAggregatorDescriptor ad = (ValueAggregatorDescriptor) iter.next(); Iterator<Entry> ens = ad.generateKeyValPairs(key, value).iterator(); while (ens.hasNext()) { Entry en = ens.next(); output.collect((WritableComparable)en.getKey(), (Writable)en.getValue()); } } }
In order to be able to use combiner, all the aggregation type be aggregators must be associative and communitive. The following are the types supported:public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { String keyStr = key.toString(); int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR); String type = keyStr.substring(0,pos); keyStr = keyStr.substring(pos+ValueAggregatorDescriptor.TYPE_SEPARATOR.length()); ValueAggregator aggregator = ValueAggregatorBaseDescriptor.generateValueAggregator(type); while (values.hasNext()) { aggregator.addNextValue(values.next()); } String val = aggregator.getReport(); key = new Text(keyStr); output.collect(key, new Text(val)); }
2. Create an xml file specifying the user plugin. 3. Compile your java class and create a jar file, say wc.jar. Finally, run the job:import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorBaseDescriptor; import org.apache.hadoop.mapred.JobConf; public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor { public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { } public void configure(JobConf job) { } }
hadoop jar wc.jar org.apache.hadoop.mapred.lib.aggregate..ValueAggregatorJob indirs outdir numofreducers textinputformat|sequencefileinputformat spec_file
|
||||||||||
PREV PACKAGE NEXT PACKAGE | FRAMES NO FRAMES |