001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapreduce.lib.output;
019
020import org.apache.hadoop.classification.InterfaceAudience;
021import org.apache.hadoop.classification.InterfaceStability;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.io.Text;
024import org.apache.hadoop.mapreduce.*;
025import org.apache.hadoop.mapreduce.Reducer.Context;
026import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
027import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
028import org.apache.hadoop.util.ReflectionUtils;
029
030import java.io.IOException;
031import java.util.*;
032
033/**
034 * The MultipleOutputs class simplifies writing output data 
035 * to multiple outputs
036 * 
037 * <p> 
038 * Case one: writing to additional outputs other than the job default output.
039 *
040 * Each additional output, or named output, may be configured with its own
041 * <code>OutputFormat</code>, with its own key class and with its own value
042 * class.
043 * </p>
044 * 
045 * <p>
046 * Case two: to write data to different files provided by user
047 * </p>
048 * 
049 * <p>
050 * MultipleOutputs supports counters, by default they are disabled. The 
051 * counters group is the {@link MultipleOutputs} class name. The names of the 
052 * counters are the same as the output name. These count the number records 
053 * written to each output name.
054 * </p>
055 * 
056 * Usage pattern for job submission:
057 * <pre>
058 *
059 * Job job = new Job();
060 *
061 * FileInputFormat.setInputPath(job, inDir);
062 * FileOutputFormat.setOutputPath(job, outDir);
063 *
064 * job.setMapperClass(MOMap.class);
065 * job.setReducerClass(MOReduce.class);
066 * ...
067 *
068 * // Defines additional single text based output 'text' for the job
069 * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
070 * LongWritable.class, Text.class);
071 *
072 * // Defines additional sequence-file based output 'sequence' for the job
073 * MultipleOutputs.addNamedOutput(job, "seq",
074 *   SequenceFileOutputFormat.class,
075 *   LongWritable.class, Text.class);
076 * ...
077 *
078 * job.waitForCompletion(true);
079 * ...
080 * </pre>
081 * <p>
082 * Usage in Reducer:
083 * <pre>
084 * <K, V> String generateFileName(K k, V v) {
085 *   return k.toString() + "_" + v.toString();
086 * }
087 * 
088 * public class MOReduce extends
089 *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
090 * private MultipleOutputs mos;
091 * public void setup(Context context) {
092 * ...
093 * mos = new MultipleOutputs(context);
094 * }
095 *
096 * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
097 * Context context)
098 * throws IOException {
099 * ...
100 * mos.write("text", , key, new Text("Hello"));
101 * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
102 * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
103 * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
104 * ...
105 * }
106 *
107 * public void cleanup(Context) throws IOException {
108 * mos.close();
109 * ...
110 * }
111 *
112 * }
113 * </pre>
114 * 
115 * <p>
116 * When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat,
117 * MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat
118 * from the old Hadoop API - ie, output can be written from the Reducer to more than one location.
119 * </p>
120 * 
121 * <p>
122 * Use <code>MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)</code> to write key and 
123 * value to a path specified by <code>baseOutputPath</code>, with no need to specify a named output:
124 * </p>
125 * 
126 * <pre>
127 * private MultipleOutputs<Text, Text> out;
128 * 
129 * public void setup(Context context) {
130 *   out = new MultipleOutputs<Text, Text>(context);
131 *   ...
132 * }
133 * 
134 * public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
135 * for (Text t : values) {
136 *   out.write(key, t, generateFileName(<<i>parameter list...</i>>));
137 *   }
138 * }
139 * 
140 * protected void cleanup(Context context) throws IOException, InterruptedException {
141 *   out.close();
142 * }
143 * </pre>
144 * 
145 * <p>
146 * Use your own code in <code>generateFileName()</code> to create a custom path to your results. 
147 * '/' characters in <code>baseOutputPath</code> will be translated into directory levels in your file system. 
148 * Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. 
149 * No call to <code>context.write()</code> is necessary. See example <code>generateFileName()</code> code below. 
150 * </p>
151 * 
152 * <pre>
153 * private String generateFileName(Text k) {
154 *   // expect Text k in format "Surname|Forename"
155 *   String[] kStr = k.toString().split("\\|");
156 *   
157 *   String sName = kStr[0];
158 *   String fName = kStr[1];
159 *
160 *   // example for k = Smith|John
161 *   // output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc)
162 *   return sName + "/" + fName;
163 * }
164 * </pre>
165 * 
166 * <p>
167 * Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000.
168 * To prevent this use <code>LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);</code>
169 * instead of <code>job.setOutputFormatClass(TextOutputFormat.class);</code> in your Hadoop job configuration.
170 * </p> 
171 * 
172 */
173@InterfaceAudience.Public
174@InterfaceStability.Stable
175public class MultipleOutputs<KEYOUT, VALUEOUT> {
176
177  private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
178
179  private static final String MO_PREFIX = 
180    "mapreduce.multipleoutputs.namedOutput.";
181
182  private static final String FORMAT = ".format";
183  private static final String KEY = ".key";
184  private static final String VALUE = ".value";
185  private static final String COUNTERS_ENABLED = 
186    "mapreduce.multipleoutputs.counters";
187
188  /**
189   * Counters group used by the counters of MultipleOutputs.
190   */
191  private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
192
193  /**
194   * Cache for the taskContexts
195   */
196  private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
197  /**
198   * Cached TaskAttemptContext which uses the job's configured settings
199   */
200  private TaskAttemptContext jobOutputFormatContext;
201
202  /**
203   * Checks if a named output name is valid token.
204   *
205   * @param namedOutput named output Name
206   * @throws IllegalArgumentException if the output name is not valid.
207   */
208  private static void checkTokenName(String namedOutput) {
209    if (namedOutput == null || namedOutput.length() == 0) {
210      throw new IllegalArgumentException(
211        "Name cannot be NULL or emtpy");
212    }
213    for (char ch : namedOutput.toCharArray()) {
214      if ((ch >= 'A') && (ch <= 'Z')) {
215        continue;
216      }
217      if ((ch >= 'a') && (ch <= 'z')) {
218        continue;
219      }
220      if ((ch >= '0') && (ch <= '9')) {
221        continue;
222      }
223      throw new IllegalArgumentException(
224        "Name cannot be have a '" + ch + "' char");
225    }
226  }
227
228  /**
229   * Checks if output name is valid.
230   *
231   * name cannot be the name used for the default output
232   * @param outputPath base output Name
233   * @throws IllegalArgumentException if the output name is not valid.
234   */
235  private static void checkBaseOutputPath(String outputPath) {
236    if (outputPath.equals(FileOutputFormat.PART)) {
237      throw new IllegalArgumentException("output name cannot be 'part'");
238    }
239  }
240  
241  /**
242   * Checks if a named output name is valid.
243   *
244   * @param namedOutput named output Name
245   * @throws IllegalArgumentException if the output name is not valid.
246   */
247  private static void checkNamedOutputName(JobContext job,
248      String namedOutput, boolean alreadyDefined) {
249    checkTokenName(namedOutput);
250    checkBaseOutputPath(namedOutput);
251    List<String> definedChannels = getNamedOutputsList(job);
252    if (alreadyDefined && definedChannels.contains(namedOutput)) {
253      throw new IllegalArgumentException("Named output '" + namedOutput +
254        "' already alreadyDefined");
255    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
256      throw new IllegalArgumentException("Named output '" + namedOutput +
257        "' not defined");
258    }
259  }
260
261  // Returns list of channel names.
262  private static List<String> getNamedOutputsList(JobContext job) {
263    List<String> names = new ArrayList<String>();
264    StringTokenizer st = new StringTokenizer(
265      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
266    while (st.hasMoreTokens()) {
267      names.add(st.nextToken());
268    }
269    return names;
270  }
271
272  // Returns the named output OutputFormat.
273  @SuppressWarnings("unchecked")
274  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
275    JobContext job, String namedOutput) {
276    return (Class<? extends OutputFormat<?, ?>>)
277      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
278      OutputFormat.class);
279  }
280
281  // Returns the key class for a named output.
282  private static Class<?> getNamedOutputKeyClass(JobContext job,
283                                                String namedOutput) {
284    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
285      Object.class);
286  }
287
288  // Returns the value class for a named output.
289  private static Class<?> getNamedOutputValueClass(
290      JobContext job, String namedOutput) {
291    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
292      null, Object.class);
293  }
294
295  /**
296   * Adds a named output for the job.
297   * <p/>
298   *
299   * @param job               job to add the named output
300   * @param namedOutput       named output name, it has to be a word, letters
301   *                          and numbers only, cannot be the word 'part' as
302   *                          that is reserved for the default output.
303   * @param outputFormatClass OutputFormat class.
304   * @param keyClass          key class
305   * @param valueClass        value class
306   */
307  @SuppressWarnings("unchecked")
308  public static void addNamedOutput(Job job, String namedOutput,
309      Class<? extends OutputFormat> outputFormatClass,
310      Class<?> keyClass, Class<?> valueClass) {
311    checkNamedOutputName(job, namedOutput, true);
312    Configuration conf = job.getConfiguration();
313    conf.set(MULTIPLE_OUTPUTS,
314      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
315    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
316      OutputFormat.class);
317    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
318    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
319  }
320
321  /**
322   * Enables or disables counters for the named outputs.
323   * 
324   * The counters group is the {@link MultipleOutputs} class name.
325   * The names of the counters are the same as the named outputs. These
326   * counters count the number records written to each output name.
327   * By default these counters are disabled.
328   *
329   * @param job    job  to enable counters
330   * @param enabled indicates if the counters will be enabled or not.
331   */
332  public static void setCountersEnabled(Job job, boolean enabled) {
333    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
334  }
335
336  /**
337   * Returns if the counters for the named outputs are enabled or not.
338   * By default these counters are disabled.
339   *
340   * @param job    the job 
341   * @return TRUE if the counters are enabled, FALSE if they are disabled.
342   */
343  public static boolean getCountersEnabled(JobContext job) {
344    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
345  }
346
347  /**
348   * Wraps RecordWriter to increment counters. 
349   */
350  @SuppressWarnings("unchecked")
351  private static class RecordWriterWithCounter extends RecordWriter {
352    private RecordWriter writer;
353    private String counterName;
354    private TaskInputOutputContext context;
355
356    public RecordWriterWithCounter(RecordWriter writer, String counterName,
357                                   TaskInputOutputContext context) {
358      this.writer = writer;
359      this.counterName = counterName;
360      this.context = context;
361    }
362
363    @SuppressWarnings({"unchecked"})
364    public void write(Object key, Object value) 
365        throws IOException, InterruptedException {
366      context.getCounter(COUNTERS_GROUP, counterName).increment(1);
367      writer.write(key, value);
368    }
369
370    public void close(TaskAttemptContext context) 
371        throws IOException, InterruptedException {
372      writer.close(context);
373    }
374  }
375
376  // instance code, to be used from Mapper/Reducer code
377
378  private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
379  private Set<String> namedOutputs;
380  private Map<String, RecordWriter<?, ?>> recordWriters;
381  private boolean countersEnabled;
382  
383  /**
384   * Creates and initializes multiple outputs support,
385   * it should be instantiated in the Mapper/Reducer setup method.
386   *
387   * @param context the TaskInputOutputContext object
388   */
389  public MultipleOutputs(
390      TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
391    this.context = context;
392    namedOutputs = Collections.unmodifiableSet(
393      new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
394    recordWriters = new HashMap<String, RecordWriter<?, ?>>();
395    countersEnabled = getCountersEnabled(context);
396  }
397
398  /**
399   * Write key and value to the namedOutput.
400   *
401   * Output path is a unique file generated for the namedOutput.
402   * For example, {namedOutput}-(m|r)-{part-number}
403   * 
404   * @param namedOutput the named output name
405   * @param key         the key
406   * @param value       the value
407   */
408  @SuppressWarnings("unchecked")
409  public <K, V> void write(String namedOutput, K key, V value)
410      throws IOException, InterruptedException {
411    write(namedOutput, key, value, namedOutput);
412  }
413
414  /**
415   * Write key and value to baseOutputPath using the namedOutput.
416   * 
417   * @param namedOutput    the named output name
418   * @param key            the key
419   * @param value          the value
420   * @param baseOutputPath base-output path to write the record to.
421   * Note: Framework will generate unique filename for the baseOutputPath
422   */
423  @SuppressWarnings("unchecked")
424  public <K, V> void write(String namedOutput, K key, V value,
425      String baseOutputPath) throws IOException, InterruptedException {
426    checkNamedOutputName(context, namedOutput, false);
427    checkBaseOutputPath(baseOutputPath);
428    if (!namedOutputs.contains(namedOutput)) {
429      throw new IllegalArgumentException("Undefined named output '" +
430        namedOutput + "'");
431    }
432    TaskAttemptContext taskContext = getContext(namedOutput);
433    getRecordWriter(taskContext, baseOutputPath).write(key, value);
434  }
435
436  /**
437   * Write key value to an output file name.
438   * 
439   * Gets the record writer from job's output format.  
440   * Job's output format should be a FileOutputFormat.
441   * 
442   * @param key       the key
443   * @param value     the value
444   * @param baseOutputPath base-output path to write the record to.
445   * Note: Framework will generate unique filename for the baseOutputPath
446   */
447  @SuppressWarnings("unchecked")
448  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
449      throws IOException, InterruptedException {
450    checkBaseOutputPath(baseOutputPath);
451    if (jobOutputFormatContext == null) {
452      jobOutputFormatContext = 
453        new TaskAttemptContextImpl(context.getConfiguration(), 
454                                   context.getTaskAttemptID(),
455                                   new WrappedStatusReporter(context));
456    }
457    getRecordWriter(jobOutputFormatContext, baseOutputPath).write(key, value);
458  }
459
460  // by being synchronized MultipleOutputTask can be use with a
461  // MultithreadedMapper.
462  @SuppressWarnings("unchecked")
463  private synchronized RecordWriter getRecordWriter(
464      TaskAttemptContext taskContext, String baseFileName) 
465      throws IOException, InterruptedException {
466    
467    // look for record-writer in the cache
468    RecordWriter writer = recordWriters.get(baseFileName);
469    
470    // If not in cache, create a new one
471    if (writer == null) {
472      // get the record writer from context output format
473      FileOutputFormat.setOutputName(taskContext, baseFileName);
474      try {
475        writer = ((OutputFormat) ReflectionUtils.newInstance(
476          taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
477          .getRecordWriter(taskContext);
478      } catch (ClassNotFoundException e) {
479        throw new IOException(e);
480      }
481 
482      // if counters are enabled, wrap the writer with context 
483      // to increment counters 
484      if (countersEnabled) {
485        writer = new RecordWriterWithCounter(writer, baseFileName, context);
486      }
487      
488      // add the record-writer to the cache
489      recordWriters.put(baseFileName, writer);
490    }
491    return writer;
492  }
493
494   // Create a taskAttemptContext for the named output with 
495   // output format and output key/value types put in the context
496  private TaskAttemptContext getContext(String nameOutput) throws IOException {
497      
498    TaskAttemptContext taskContext = taskContexts.get(nameOutput);
499    
500    if (taskContext != null) {
501        return taskContext;
502    }
503    
504    // The following trick leverages the instantiation of a record writer via
505    // the job thus supporting arbitrary output formats.
506    Job job = new Job(context.getConfiguration());
507    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
508    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
509    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
510    taskContext = new TaskAttemptContextImpl(job.getConfiguration(), context
511        .getTaskAttemptID(), new WrappedStatusReporter(context));
512
513    taskContexts.put(nameOutput, taskContext);
514
515    return taskContext;
516  }
517
518  private static class WrappedStatusReporter extends StatusReporter {
519
520    TaskAttemptContext context;
521
522    public WrappedStatusReporter(TaskAttemptContext context) {
523      this.context = context;
524    }
525
526    @Override
527    public Counter getCounter(Enum<?> name) {
528      return context.getCounter(name);
529    }
530
531    @Override
532    public Counter getCounter(String group, String name) {
533      return context.getCounter(group, name);
534    }
535
536    @Override
537    public void progress() {
538      context.progress();
539    }
540
541    @Override
542    public float getProgress() {
543      return context.getProgress();
544    }
545    
546    @Override
547    public void setStatus(String status) {
548      context.setStatus(status);
549    }
550  }
551
552  /**
553   * Closes all the opened outputs.
554   * 
555   * This should be called from cleanup method of map/reduce task.
556   * If overridden subclasses must invoke <code>super.close()</code> at the
557   * end of their <code>close()</code>
558   * 
559   */
560  @SuppressWarnings("unchecked")
561  public void close() throws IOException, InterruptedException {
562    for (RecordWriter writer : recordWriters.values()) {
563      writer.close(context);
564    }
565  }
566}