@InterfaceAudience.Public @InterfaceStability.Stable public class MultipleOutputs<KEYOUT,VALUEOUT> extends Object
Case one: writing to additional outputs other than the job default output.
Each additional output, or named output, may be configured with its own
OutputFormat
, with its own key class and with its own value
class.
Case two: to write data to different files provided by user
MultipleOutputs supports counters, by default they are disabled. The
counters group is the MultipleOutputs
class name. The names of the
counters are the same as the output name. These count the number records
written to each output name.
Job job = new Job(); FileInputFormat.setInputPath(job, inDir); FileOutputFormat.setOutputPath(job, outDir); job.setMapperClass(MOMap.class); job.setReducerClass(MOReduce.class); ... // Defines additional single text based output 'text' for the job MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, LongWritable.class, Text.class); // Defines additional sequence-file based output 'sequence' for the job MultipleOutputs.addNamedOutput(job, "seq", SequenceFileOutputFormat.class, LongWritable.class, Text.class); ... job.waitForCompletion(true); ...
Usage in Reducer:
<K, V> String generateFileName(K k, V v) { return k.toString() + "_" + v.toString(); } public class MOReduce extends Reducer<WritableComparable, Writable,WritableComparable, Writable> { private MultipleOutputs mos; public void setup(Context context) { ... mos = new MultipleOutputs(context); } public void reduce(WritableComparable key, Iterator<Writable> values, Context context) throws IOException { ... mos.write("text", , key, new Text("Hello")); mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a"); mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b"); mos.write(key, new Text("value"), generateFileName(key, new Text("value"))); ... } public void cleanup(Context) throws IOException { mos.close(); ... } }
When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat, MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat from the old Hadoop API - ie, output can be written from the Reducer to more than one location.
Use MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)
to write key and
value to a path specified by baseOutputPath
, with no need to specify a named output.
Warning: when the baseOutputPath passed to MultipleOutputs.write
is a path that resolves outside of the final job output directory, the
directory is created immediately and then persists through subsequent
task retries, breaking the concept of output committing:
private MultipleOutputs<Text, Text> out; public void setup(Context context) { out = new MultipleOutputs<Text, Text>(context); ... } public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text t : values) { out.write(key, t, generateFileName(<parameter list...>)); } } protected void cleanup(Context context) throws IOException, InterruptedException { out.close(); }
Use your own code in generateFileName()
to create a custom path to your results.
'/' characters in baseOutputPath
will be translated into directory levels in your file system.
Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc.
No call to context.write()
is necessary. See example generateFileName()
code below.
private String generateFileName(Text k) { // expect Text k in format "Surname|Forename" String[] kStr = k.toString().split("\\|"); String sName = kStr[0]; String fName = kStr[1]; // example for k = Smith|John // output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc) return sName + "/" + fName; }
Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000.
To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
instead of job.setOutputFormatClass(TextOutputFormat.class);
in your Hadoop job configuration.
Constructor and Description |
---|
MultipleOutputs(TaskInputOutputContext<?,?,KEYOUT,VALUEOUT> context)
Creates and initializes multiple outputs support,
it should be instantiated in the Mapper/Reducer setup method.
|
Modifier and Type | Method and Description |
---|---|
static void |
addNamedOutput(Job job,
String namedOutput,
Class<? extends OutputFormat> outputFormatClass,
Class<?> keyClass,
Class<?> valueClass)
Adds a named output for the job.
|
void |
close()
Closes all the opened outputs.
|
static boolean |
getCountersEnabled(JobContext job)
Returns if the counters for the named outputs are enabled or not.
|
static void |
setCountersEnabled(Job job,
boolean enabled)
Enables or disables counters for the named outputs.
|
void |
write(KEYOUT key,
VALUEOUT value,
String baseOutputPath)
Write key value to an output file name.
|
<K,V> void |
write(String namedOutput,
K key,
V value)
Write key and value to the namedOutput.
|
<K,V> void |
write(String namedOutput,
K key,
V value,
String baseOutputPath)
Write key and value to baseOutputPath using the namedOutput.
|
public MultipleOutputs(TaskInputOutputContext<?,?,KEYOUT,VALUEOUT> context)
context
- the TaskInputOutputContext objectpublic static void addNamedOutput(Job job, String namedOutput, Class<? extends OutputFormat> outputFormatClass, Class<?> keyClass, Class<?> valueClass)
job
- job to add the named outputnamedOutput
- named output name, it has to be a word, letters
and numbers only, cannot be the word 'part' as
that is reserved for the default output.outputFormatClass
- OutputFormat class.keyClass
- key classvalueClass
- value classpublic static void setCountersEnabled(Job job, boolean enabled)
MultipleOutputs
class name.
The names of the counters are the same as the named outputs. These
counters count the number records written to each output name.
By default these counters are disabled.job
- job to enable countersenabled
- indicates if the counters will be enabled or not.public static boolean getCountersEnabled(JobContext job)
job
- the jobpublic <K,V> void write(String namedOutput, K key, V value) throws IOException, InterruptedException
namedOutput
- the named output namekey
- the keyvalue
- the valueIOException
InterruptedException
public <K,V> void write(String namedOutput, K key, V value, String baseOutputPath) throws IOException, InterruptedException
namedOutput
- the named output namekey
- the keyvalue
- the valuebaseOutputPath
- base-output path to write the record to.
Note: Framework will generate unique filename for the baseOutputPath
Warning: when the baseOutputPath is a path that resolves
outside of the final job output directory, the directory is created
immediately and then persists through subsequent task retries, breaking
the concept of output committing.IOException
InterruptedException
public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) throws IOException, InterruptedException
key
- the keyvalue
- the valuebaseOutputPath
- base-output path to write the record to.
Note: Framework will generate unique filename for the baseOutputPath
Warning: when the baseOutputPath is a path that resolves
outside of the final job output directory, the directory is created
immediately and then persists through subsequent task retries, breaking
the concept of output committing.IOException
InterruptedException
public void close() throws IOException, InterruptedException
super.close()
at the
end of their close()
IOException
InterruptedException
Copyright © 2021 Apache Software Foundation. All rights reserved.