001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapreduce.lib.output;
019
020import org.apache.hadoop.classification.InterfaceAudience;
021import org.apache.hadoop.classification.InterfaceStability;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.io.Text;
024import org.apache.hadoop.mapreduce.*;
025import org.apache.hadoop.mapreduce.Reducer.Context;
026import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
027import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
028import org.apache.hadoop.util.ReflectionUtils;
029
030import java.io.IOException;
031import java.util.*;
032
033/**
034 * The MultipleOutputs class simplifies writing output data 
035 * to multiple outputs
036 * 
037 * <p> 
038 * Case one: writing to additional outputs other than the job default output.
039 *
040 * Each additional output, or named output, may be configured with its own
041 * <code>OutputFormat</code>, with its own key class and with its own value
042 * class.
043 * </p>
044 * 
045 * <p>
046 * Case two: to write data to different files provided by user
047 * </p>
048 * 
049 * <p>
050 * MultipleOutputs supports counters, by default they are disabled. The 
051 * counters group is the {@link MultipleOutputs} class name. The names of the 
052 * counters are the same as the output name. These count the number records 
053 * written to each output name.
054 * </p>
055 * 
056 * Usage pattern for job submission:
057 * <pre>
058 *
059 * Job job = new Job();
060 *
061 * FileInputFormat.setInputPath(job, inDir);
062 * FileOutputFormat.setOutputPath(job, outDir);
063 *
064 * job.setMapperClass(MOMap.class);
065 * job.setReducerClass(MOReduce.class);
066 * ...
067 *
068 * // Defines additional single text based output 'text' for the job
069 * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
070 * LongWritable.class, Text.class);
071 *
072 * // Defines additional sequence-file based output 'sequence' for the job
073 * MultipleOutputs.addNamedOutput(job, "seq",
074 *   SequenceFileOutputFormat.class,
075 *   LongWritable.class, Text.class);
076 * ...
077 *
078 * job.waitForCompletion(true);
079 * ...
080 * </pre>
081 * <p>
082 * Usage in Reducer:
083 * <pre>
084 * &lt;K, V&gt; String generateFileName(K k, V v) {
085 *   return k.toString() + "_" + v.toString();
086 * }
087 * 
088 * public class MOReduce extends
089 *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
090 * private MultipleOutputs mos;
091 * public void setup(Context context) {
092 * ...
093 * mos = new MultipleOutputs(context);
094 * }
095 *
096 * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
097 * Context context)
098 * throws IOException {
099 * ...
100 * mos.write("text", , key, new Text("Hello"));
101 * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
102 * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
103 * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
104 * ...
105 * }
106 *
107 * public void cleanup(Context) throws IOException {
108 * mos.close();
109 * ...
110 * }
111 *
112 * }
113 * </pre>
114 * 
115 * <p>
116 * When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat,
117 * MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat
118 * from the old Hadoop API - ie, output can be written from the Reducer to more than one location.
119 * </p>
120 * 
121 * <p>
122 * Use <code>MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)</code> to write key and 
123 * value to a path specified by <code>baseOutputPath</code>, with no need to specify a named output:
124 * </p>
125 * 
126 * <pre>
127 * private MultipleOutputs&lt;Text, Text&gt; out;
128 * 
129 * public void setup(Context context) {
130 *   out = new MultipleOutputs&lt;Text, Text&gt;(context);
131 *   ...
132 * }
133 * 
134 * public void reduce(Text key, Iterable&lt;Text&gt; values, Context context) throws IOException, InterruptedException {
135 * for (Text t : values) {
136 *   out.write(key, t, generateFileName(&lt;<i>parameter list...</i>&gt;));
137 *   }
138 * }
139 * 
140 * protected void cleanup(Context context) throws IOException, InterruptedException {
141 *   out.close();
142 * }
143 * </pre>
144 * 
145 * <p>
146 * Use your own code in <code>generateFileName()</code> to create a custom path to your results. 
147 * '/' characters in <code>baseOutputPath</code> will be translated into directory levels in your file system. 
148 * Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. 
149 * No call to <code>context.write()</code> is necessary. See example <code>generateFileName()</code> code below. 
150 * </p>
151 * 
152 * <pre>
153 * private String generateFileName(Text k) {
154 *   // expect Text k in format "Surname|Forename"
155 *   String[] kStr = k.toString().split("\\|");
156 *   
157 *   String sName = kStr[0];
158 *   String fName = kStr[1];
159 *
160 *   // example for k = Smith|John
161 *   // output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc)
162 *   return sName + "/" + fName;
163 * }
164 * </pre>
165 * 
166 * <p>
167 * Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000.
168 * To prevent this use <code>LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);</code>
169 * instead of <code>job.setOutputFormatClass(TextOutputFormat.class);</code> in your Hadoop job configuration.
170 * </p> 
171 * 
172 */
173@InterfaceAudience.Public
174@InterfaceStability.Stable
175public class MultipleOutputs<KEYOUT, VALUEOUT> {
176
177  private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
178
179  private static final String MO_PREFIX = 
180    "mapreduce.multipleoutputs.namedOutput.";
181
182  private static final String FORMAT = ".format";
183  private static final String KEY = ".key";
184  private static final String VALUE = ".value";
185  private static final String COUNTERS_ENABLED = 
186    "mapreduce.multipleoutputs.counters";
187
188  /**
189   * Counters group used by the counters of MultipleOutputs.
190   */
191  private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
192
193  /**
194   * Cache for the taskContexts
195   */
196  private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
197  /**
198   * Cached TaskAttemptContext which uses the job's configured settings
199   */
200  private TaskAttemptContext jobOutputFormatContext;
201
202  /**
203   * Checks if a named output name is valid token.
204   *
205   * @param namedOutput named output Name
206   * @throws IllegalArgumentException if the output name is not valid.
207   */
208  private static void checkTokenName(String namedOutput) {
209    if (namedOutput == null || namedOutput.length() == 0) {
210      throw new IllegalArgumentException(
211        "Name cannot be NULL or emtpy");
212    }
213    for (char ch : namedOutput.toCharArray()) {
214      if ((ch >= 'A') && (ch <= 'Z')) {
215        continue;
216      }
217      if ((ch >= 'a') && (ch <= 'z')) {
218        continue;
219      }
220      if ((ch >= '0') && (ch <= '9')) {
221        continue;
222      }
223      throw new IllegalArgumentException(
224        "Name cannot be have a '" + ch + "' char");
225    }
226  }
227
228  /**
229   * Checks if output name is valid.
230   *
231   * name cannot be the name used for the default output
232   * @param outputPath base output Name
233   * @throws IllegalArgumentException if the output name is not valid.
234   */
235  private static void checkBaseOutputPath(String outputPath) {
236    if (outputPath.equals(FileOutputFormat.PART)) {
237      throw new IllegalArgumentException("output name cannot be 'part'");
238    }
239  }
240  
241  /**
242   * Checks if a named output name is valid.
243   *
244   * @param namedOutput named output Name
245   * @throws IllegalArgumentException if the output name is not valid.
246   */
247  private static void checkNamedOutputName(JobContext job,
248      String namedOutput, boolean alreadyDefined) {
249    checkTokenName(namedOutput);
250    checkBaseOutputPath(namedOutput);
251    List<String> definedChannels = getNamedOutputsList(job);
252    if (alreadyDefined && definedChannels.contains(namedOutput)) {
253      throw new IllegalArgumentException("Named output '" + namedOutput +
254        "' already alreadyDefined");
255    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
256      throw new IllegalArgumentException("Named output '" + namedOutput +
257        "' not defined");
258    }
259  }
260
261  // Returns list of channel names.
262  private static List<String> getNamedOutputsList(JobContext job) {
263    List<String> names = new ArrayList<String>();
264    StringTokenizer st = new StringTokenizer(
265      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
266    while (st.hasMoreTokens()) {
267      names.add(st.nextToken());
268    }
269    return names;
270  }
271
272  // Returns the named output OutputFormat.
273  @SuppressWarnings("unchecked")
274  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
275    JobContext job, String namedOutput) {
276    return (Class<? extends OutputFormat<?, ?>>)
277      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
278      OutputFormat.class);
279  }
280
281  // Returns the key class for a named output.
282  private static Class<?> getNamedOutputKeyClass(JobContext job,
283                                                String namedOutput) {
284    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
285      Object.class);
286  }
287
288  // Returns the value class for a named output.
289  private static Class<?> getNamedOutputValueClass(
290      JobContext job, String namedOutput) {
291    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
292      null, Object.class);
293  }
294
295  /**
296   * Adds a named output for the job.
297   *
298   * @param job               job to add the named output
299   * @param namedOutput       named output name, it has to be a word, letters
300   *                          and numbers only, cannot be the word 'part' as
301   *                          that is reserved for the default output.
302   * @param outputFormatClass OutputFormat class.
303   * @param keyClass          key class
304   * @param valueClass        value class
305   */
306  @SuppressWarnings("unchecked")
307  public static void addNamedOutput(Job job, String namedOutput,
308      Class<? extends OutputFormat> outputFormatClass,
309      Class<?> keyClass, Class<?> valueClass) {
310    checkNamedOutputName(job, namedOutput, true);
311    Configuration conf = job.getConfiguration();
312    conf.set(MULTIPLE_OUTPUTS,
313      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
314    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
315      OutputFormat.class);
316    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
317    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
318  }
319
320  /**
321   * Enables or disables counters for the named outputs.
322   * 
323   * The counters group is the {@link MultipleOutputs} class name.
324   * The names of the counters are the same as the named outputs. These
325   * counters count the number records written to each output name.
326   * By default these counters are disabled.
327   *
328   * @param job    job  to enable counters
329   * @param enabled indicates if the counters will be enabled or not.
330   */
331  public static void setCountersEnabled(Job job, boolean enabled) {
332    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
333  }
334
335  /**
336   * Returns if the counters for the named outputs are enabled or not.
337   * By default these counters are disabled.
338   *
339   * @param job    the job 
340   * @return TRUE if the counters are enabled, FALSE if they are disabled.
341   */
342  public static boolean getCountersEnabled(JobContext job) {
343    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
344  }
345
346  /**
347   * Wraps RecordWriter to increment counters. 
348   */
349  @SuppressWarnings("unchecked")
350  private static class RecordWriterWithCounter extends RecordWriter {
351    private RecordWriter writer;
352    private String counterName;
353    private TaskInputOutputContext context;
354
355    public RecordWriterWithCounter(RecordWriter writer, String counterName,
356                                   TaskInputOutputContext context) {
357      this.writer = writer;
358      this.counterName = counterName;
359      this.context = context;
360    }
361
362    @SuppressWarnings({"unchecked"})
363    public void write(Object key, Object value) 
364        throws IOException, InterruptedException {
365      context.getCounter(COUNTERS_GROUP, counterName).increment(1);
366      writer.write(key, value);
367    }
368
369    public void close(TaskAttemptContext context) 
370        throws IOException, InterruptedException {
371      writer.close(context);
372    }
373  }
374
375  // instance code, to be used from Mapper/Reducer code
376
377  private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
378  private Set<String> namedOutputs;
379  private Map<String, RecordWriter<?, ?>> recordWriters;
380  private boolean countersEnabled;
381  
382  /**
383   * Creates and initializes multiple outputs support,
384   * it should be instantiated in the Mapper/Reducer setup method.
385   *
386   * @param context the TaskInputOutputContext object
387   */
388  public MultipleOutputs(
389      TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
390    this.context = context;
391    namedOutputs = Collections.unmodifiableSet(
392      new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
393    recordWriters = new HashMap<String, RecordWriter<?, ?>>();
394    countersEnabled = getCountersEnabled(context);
395  }
396
397  /**
398   * Write key and value to the namedOutput.
399   *
400   * Output path is a unique file generated for the namedOutput.
401   * For example, {namedOutput}-(m|r)-{part-number}
402   * 
403   * @param namedOutput the named output name
404   * @param key         the key
405   * @param value       the value
406   */
407  @SuppressWarnings("unchecked")
408  public <K, V> void write(String namedOutput, K key, V value)
409      throws IOException, InterruptedException {
410    write(namedOutput, key, value, namedOutput);
411  }
412
413  /**
414   * Write key and value to baseOutputPath using the namedOutput.
415   * 
416   * @param namedOutput    the named output name
417   * @param key            the key
418   * @param value          the value
419   * @param baseOutputPath base-output path to write the record to.
420   * Note: Framework will generate unique filename for the baseOutputPath
421   */
422  @SuppressWarnings("unchecked")
423  public <K, V> void write(String namedOutput, K key, V value,
424      String baseOutputPath) throws IOException, InterruptedException {
425    checkNamedOutputName(context, namedOutput, false);
426    checkBaseOutputPath(baseOutputPath);
427    if (!namedOutputs.contains(namedOutput)) {
428      throw new IllegalArgumentException("Undefined named output '" +
429        namedOutput + "'");
430    }
431    TaskAttemptContext taskContext = getContext(namedOutput);
432    getRecordWriter(taskContext, baseOutputPath).write(key, value);
433  }
434
435  /**
436   * Write key value to an output file name.
437   * 
438   * Gets the record writer from job's output format.  
439   * Job's output format should be a FileOutputFormat.
440   * 
441   * @param key       the key
442   * @param value     the value
443   * @param baseOutputPath base-output path to write the record to.
444   * Note: Framework will generate unique filename for the baseOutputPath
445   */
446  @SuppressWarnings("unchecked")
447  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
448      throws IOException, InterruptedException {
449    checkBaseOutputPath(baseOutputPath);
450    if (jobOutputFormatContext == null) {
451      jobOutputFormatContext = 
452        new TaskAttemptContextImpl(context.getConfiguration(), 
453                                   context.getTaskAttemptID(),
454                                   new WrappedStatusReporter(context));
455    }
456    getRecordWriter(jobOutputFormatContext, baseOutputPath).write(key, value);
457  }
458
459  // by being synchronized MultipleOutputTask can be use with a
460  // MultithreadedMapper.
461  @SuppressWarnings("unchecked")
462  private synchronized RecordWriter getRecordWriter(
463      TaskAttemptContext taskContext, String baseFileName) 
464      throws IOException, InterruptedException {
465    
466    // look for record-writer in the cache
467    RecordWriter writer = recordWriters.get(baseFileName);
468    
469    // If not in cache, create a new one
470    if (writer == null) {
471      // get the record writer from context output format
472      FileOutputFormat.setOutputName(taskContext, baseFileName);
473      try {
474        writer = ((OutputFormat) ReflectionUtils.newInstance(
475          taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
476          .getRecordWriter(taskContext);
477      } catch (ClassNotFoundException e) {
478        throw new IOException(e);
479      }
480 
481      // if counters are enabled, wrap the writer with context 
482      // to increment counters 
483      if (countersEnabled) {
484        writer = new RecordWriterWithCounter(writer, baseFileName, context);
485      }
486      
487      // add the record-writer to the cache
488      recordWriters.put(baseFileName, writer);
489    }
490    return writer;
491  }
492
493   // Create a taskAttemptContext for the named output with 
494   // output format and output key/value types put in the context
495  private TaskAttemptContext getContext(String nameOutput) throws IOException {
496      
497    TaskAttemptContext taskContext = taskContexts.get(nameOutput);
498    
499    if (taskContext != null) {
500        return taskContext;
501    }
502    
503    // The following trick leverages the instantiation of a record writer via
504    // the job thus supporting arbitrary output formats.
505    Job job = Job.getInstance(context.getConfiguration());
506    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
507    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
508    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
509    taskContext = new TaskAttemptContextImpl(job.getConfiguration(), context
510        .getTaskAttemptID(), new WrappedStatusReporter(context));
511
512    taskContexts.put(nameOutput, taskContext);
513
514    return taskContext;
515  }
516
517  private static class WrappedStatusReporter extends StatusReporter {
518
519    TaskAttemptContext context;
520
521    public WrappedStatusReporter(TaskAttemptContext context) {
522      this.context = context;
523    }
524
525    @Override
526    public Counter getCounter(Enum<?> name) {
527      return context.getCounter(name);
528    }
529
530    @Override
531    public Counter getCounter(String group, String name) {
532      return context.getCounter(group, name);
533    }
534
535    @Override
536    public void progress() {
537      context.progress();
538    }
539
540    @Override
541    public float getProgress() {
542      return context.getProgress();
543    }
544    
545    @Override
546    public void setStatus(String status) {
547      context.setStatus(status);
548    }
549  }
550
551  /**
552   * Closes all the opened outputs.
553   * 
554   * This should be called from cleanup method of map/reduce task.
555   * If overridden subclasses must invoke <code>super.close()</code> at the
556   * end of their <code>close()</code>
557   * 
558   */
559  @SuppressWarnings("unchecked")
560  public void close() throws IOException, InterruptedException {
561    for (RecordWriter writer : recordWriters.values()) {
562      writer.close(context);
563    }
564  }
565}