org.apache.hadoop.mapreduce.lib.output
Class MultipleOutputs<KEYOUT,VALUEOUT>

java.lang.Object
  extended by org.apache.hadoop.mapreduce.lib.output.MultipleOutputs<KEYOUT,VALUEOUT>

@InterfaceAudience.Public
@InterfaceStability.Stable
public class MultipleOutputs<KEYOUT,VALUEOUT>
extends Object

The MultipleOutputs class simplifies writing output data to multiple outputs

Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own OutputFormat, with its own key class and with its own value class.

Case two: to write data to different files provided by user

MultipleOutputs supports counters, by default they are disabled. The counters group is the MultipleOutputs class name. The names of the counters are the same as the output name. These count the number records written to each output name.

Usage pattern for job submission:

 Job job = new Job();

 FileInputFormat.setInputPath(job, inDir);
 FileOutputFormat.setOutputPath(job, outDir);

 job.setMapperClass(MOMap.class);
 job.setReducerClass(MOReduce.class);
 ...

 // Defines additional single text based output 'text' for the job
 MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
 LongWritable.class, Text.class);

 // Defines additional sequence-file based output 'sequence' for the job
 MultipleOutputs.addNamedOutput(job, "seq",
   SequenceFileOutputFormat.class,
   LongWritable.class, Text.class);
 ...

 job.waitForCompletion(true);
 ...
 

Usage in Reducer:

  String generateFileName(K k, V v) {
   return k.toString() + "_" + v.toString();
 }
 
 public class MOReduce extends
   Reducer<WritableComparable, Writable,WritableComparable, Writable> {
 private MultipleOutputs mos;
 public void setup(Context context) {
 ...
 mos = new MultipleOutputs(context);
 }

 public void reduce(WritableComparable key, Iterator<Writable> values,
 Context context)
 throws IOException {
 ...
 mos.write("text", , key, new Text("Hello"));
 mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
 mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
 mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
 ...
 }

 public void cleanup(Context) throws IOException {
 mos.close();
 ...
 }

 }
 

When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat, MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat from the old Hadoop API - ie, output can be written from the Reducer to more than one location.

Use MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath) to write key and value to a path specified by baseOutputPath, with no need to specify a named output:

 private MultipleOutputs out;
 
 public void setup(Context context) {
   out = new MultipleOutputs(context);
   ...
 }
 
 public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
 for (Text t : values) {
   out.write(key, t, generateFileName(<parameter list...>));
   }
 }
 
 protected void cleanup(Context context) throws IOException, InterruptedException {
   out.close();
 }
 

Use your own code in generateFileName() to create a custom path to your results. '/' characters in baseOutputPath will be translated into directory levels in your file system. Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. No call to context.write() is necessary. See example generateFileName() code below.

 private String generateFileName(Text k) {
   // expect Text k in format "Surname|Forename"
   String[] kStr = k.toString().split("\\|");
   
   String sName = kStr[0];
   String fName = kStr[1];

   // example for k = Smith|John
   // output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc)
   return sName + "/" + fName;
 }
 

Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000. To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); instead of job.setOutputFormatClass(TextOutputFormat.class); in your Hadoop job configuration.


Constructor Summary
MultipleOutputs(TaskInputOutputContext<?,?,KEYOUT,VALUEOUT> context)
          Creates and initializes multiple outputs support, it should be instantiated in the Mapper/Reducer setup method.
 
Method Summary
static void addNamedOutput(Job job, String namedOutput, Class<? extends OutputFormat> outputFormatClass, Class<?> keyClass, Class<?> valueClass)
          Adds a named output for the job.
 void close()
          Closes all the opened outputs.
static boolean getCountersEnabled(JobContext job)
          Returns if the counters for the named outputs are enabled or not.
static void setCountersEnabled(Job job, boolean enabled)
          Enables or disables counters for the named outputs.
 void write(KEYOUT key, VALUEOUT value, String baseOutputPath)
          Write key value to an output file name.
<K,V> void
write(String namedOutput, K key, V value)
          Write key and value to the namedOutput.
<K,V> void
write(String namedOutput, K key, V value, String baseOutputPath)
          Write key and value to baseOutputPath using the namedOutput.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

MultipleOutputs

public MultipleOutputs(TaskInputOutputContext<?,?,KEYOUT,VALUEOUT> context)
Creates and initializes multiple outputs support, it should be instantiated in the Mapper/Reducer setup method.

Parameters:
context - the TaskInputOutputContext object
Method Detail

addNamedOutput

public static void addNamedOutput(Job job,
                                  String namedOutput,
                                  Class<? extends OutputFormat> outputFormatClass,
                                  Class<?> keyClass,
                                  Class<?> valueClass)
Adds a named output for the job.

Parameters:
job - job to add the named output
namedOutput - named output name, it has to be a word, letters and numbers only, cannot be the word 'part' as that is reserved for the default output.
outputFormatClass - OutputFormat class.
keyClass - key class
valueClass - value class

setCountersEnabled

public static void setCountersEnabled(Job job,
                                      boolean enabled)
Enables or disables counters for the named outputs. The counters group is the MultipleOutputs class name. The names of the counters are the same as the named outputs. These counters count the number records written to each output name. By default these counters are disabled.

Parameters:
job - job to enable counters
enabled - indicates if the counters will be enabled or not.

getCountersEnabled

public static boolean getCountersEnabled(JobContext job)
Returns if the counters for the named outputs are enabled or not. By default these counters are disabled.

Parameters:
job - the job
Returns:
TRUE if the counters are enabled, FALSE if they are disabled.

write

public <K,V> void write(String namedOutput,
                        K key,
                        V value)
           throws IOException,
                  InterruptedException
Write key and value to the namedOutput. Output path is a unique file generated for the namedOutput. For example, {namedOutput}-(m|r)-{part-number}

Parameters:
namedOutput - the named output name
key - the key
value - the value
Throws:
IOException
InterruptedException

write

public <K,V> void write(String namedOutput,
                        K key,
                        V value,
                        String baseOutputPath)
           throws IOException,
                  InterruptedException
Write key and value to baseOutputPath using the namedOutput.

Parameters:
namedOutput - the named output name
key - the key
value - the value
baseOutputPath - base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath
Throws:
IOException
InterruptedException

write

public void write(KEYOUT key,
                  VALUEOUT value,
                  String baseOutputPath)
           throws IOException,
                  InterruptedException
Write key value to an output file name. Gets the record writer from job's output format. Job's output format should be a FileOutputFormat.

Parameters:
key - the key
value - the value
baseOutputPath - base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath
Throws:
IOException
InterruptedException

close

public void close()
           throws IOException,
                  InterruptedException
Closes all the opened outputs. This should be called from cleanup method of map/reduce task. If overridden subclasses must invoke super.close() at the end of their close()

Throws:
IOException
InterruptedException


Copyright © 2014 Apache Software Foundation. All Rights Reserved.