001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapreduce.lib.output;
019
020import org.apache.hadoop.classification.InterfaceAudience;
021import org.apache.hadoop.classification.InterfaceStability;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.mapreduce.*;
024import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
025import org.apache.hadoop.util.ReflectionUtils;
026
027import java.io.IOException;
028import java.util.*;
029
030/**
031 * The MultipleOutputs class simplifies writing output data 
032 * to multiple outputs
033 * 
034 * <p> 
035 * Case one: writing to additional outputs other than the job default output.
036 *
037 * Each additional output, or named output, may be configured with its own
038 * <code>OutputFormat</code>, with its own key class and with its own value
039 * class.
040 * 
041 * <p>
042 * Case two: to write data to different files provided by user
043 * </p>
044 * 
045 * <p>
046 * MultipleOutputs supports counters, by default they are disabled. The 
047 * counters group is the {@link MultipleOutputs} class name. The names of the 
048 * counters are the same as the output name. These count the number records 
049 * written to each output name.
050 * </p>
051 * 
052 * Usage pattern for job submission:
053 * <pre>
054 *
055 * Job job = new Job();
056 *
057 * FileInputFormat.setInputPath(job, inDir);
058 * FileOutputFormat.setOutputPath(job, outDir);
059 *
060 * job.setMapperClass(MOMap.class);
061 * job.setReducerClass(MOReduce.class);
062 * ...
063 *
064 * // Defines additional single text based output 'text' for the job
065 * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
066 * LongWritable.class, Text.class);
067 *
068 * // Defines additional sequence-file based output 'sequence' for the job
069 * MultipleOutputs.addNamedOutput(job, "seq",
070 *   SequenceFileOutputFormat.class,
071 *   LongWritable.class, Text.class);
072 * ...
073 *
074 * job.waitForCompletion(true);
075 * ...
076 * </pre>
077 * <p>
078 * Usage in Reducer:
079 * <pre>
080 * <K, V> String generateFileName(K k, V v) {
081 *   return k.toString() + "_" + v.toString();
082 * }
083 * 
084 * public class MOReduce extends
085 *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
086 * private MultipleOutputs mos;
087 * public void setup(Context context) {
088 * ...
089 * mos = new MultipleOutputs(context);
090 * }
091 *
092 * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
093 * Context context)
094 * throws IOException {
095 * ...
096 * mos.write("text", , key, new Text("Hello"));
097 * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
098 * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
099 * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
100 * ...
101 * }
102 *
103 * public void cleanup(Context) throws IOException {
104 * mos.close();
105 * ...
106 * }
107 *
108 * }
109 * </pre>
110 */
111@InterfaceAudience.Public
112@InterfaceStability.Stable
113public class MultipleOutputs<KEYOUT, VALUEOUT> {
114
115  private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
116
117  private static final String MO_PREFIX = 
118    "mapreduce.multipleoutputs.namedOutput.";
119
120  private static final String FORMAT = ".format";
121  private static final String KEY = ".key";
122  private static final String VALUE = ".value";
123  private static final String COUNTERS_ENABLED = 
124    "mapreduce.multipleoutputs.counters";
125
126  /**
127   * Counters group used by the counters of MultipleOutputs.
128   */
129  private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
130
131  /**
132   * Cache for the taskContexts
133   */
134  private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
135  /**
136   * Cached TaskAttemptContext which uses the job's configured settings
137   */
138  private TaskAttemptContext jobOutputFormatContext;
139
140  /**
141   * Checks if a named output name is valid token.
142   *
143   * @param namedOutput named output Name
144   * @throws IllegalArgumentException if the output name is not valid.
145   */
146  private static void checkTokenName(String namedOutput) {
147    if (namedOutput == null || namedOutput.length() == 0) {
148      throw new IllegalArgumentException(
149        "Name cannot be NULL or emtpy");
150    }
151    for (char ch : namedOutput.toCharArray()) {
152      if ((ch >= 'A') && (ch <= 'Z')) {
153        continue;
154      }
155      if ((ch >= 'a') && (ch <= 'z')) {
156        continue;
157      }
158      if ((ch >= '0') && (ch <= '9')) {
159        continue;
160      }
161      throw new IllegalArgumentException(
162        "Name cannot be have a '" + ch + "' char");
163    }
164  }
165
166  /**
167   * Checks if output name is valid.
168   *
169   * name cannot be the name used for the default output
170   * @param outputPath base output Name
171   * @throws IllegalArgumentException if the output name is not valid.
172   */
173  private static void checkBaseOutputPath(String outputPath) {
174    if (outputPath.equals(FileOutputFormat.PART)) {
175      throw new IllegalArgumentException("output name cannot be 'part'");
176    }
177  }
178  
179  /**
180   * Checks if a named output name is valid.
181   *
182   * @param namedOutput named output Name
183   * @throws IllegalArgumentException if the output name is not valid.
184   */
185  private static void checkNamedOutputName(JobContext job,
186      String namedOutput, boolean alreadyDefined) {
187    checkTokenName(namedOutput);
188    checkBaseOutputPath(namedOutput);
189    List<String> definedChannels = getNamedOutputsList(job);
190    if (alreadyDefined && definedChannels.contains(namedOutput)) {
191      throw new IllegalArgumentException("Named output '" + namedOutput +
192        "' already alreadyDefined");
193    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
194      throw new IllegalArgumentException("Named output '" + namedOutput +
195        "' not defined");
196    }
197  }
198
199  // Returns list of channel names.
200  private static List<String> getNamedOutputsList(JobContext job) {
201    List<String> names = new ArrayList<String>();
202    StringTokenizer st = new StringTokenizer(
203      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
204    while (st.hasMoreTokens()) {
205      names.add(st.nextToken());
206    }
207    return names;
208  }
209
210  // Returns the named output OutputFormat.
211  @SuppressWarnings("unchecked")
212  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
213    JobContext job, String namedOutput) {
214    return (Class<? extends OutputFormat<?, ?>>)
215      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
216      OutputFormat.class);
217  }
218
219  // Returns the key class for a named output.
220  private static Class<?> getNamedOutputKeyClass(JobContext job,
221                                                String namedOutput) {
222    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
223      Object.class);
224  }
225
226  // Returns the value class for a named output.
227  private static Class<?> getNamedOutputValueClass(
228      JobContext job, String namedOutput) {
229    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
230      null, Object.class);
231  }
232
233  /**
234   * Adds a named output for the job.
235   * <p/>
236   *
237   * @param job               job to add the named output
238   * @param namedOutput       named output name, it has to be a word, letters
239   *                          and numbers only, cannot be the word 'part' as
240   *                          that is reserved for the default output.
241   * @param outputFormatClass OutputFormat class.
242   * @param keyClass          key class
243   * @param valueClass        value class
244   */
245  @SuppressWarnings("unchecked")
246  public static void addNamedOutput(Job job, String namedOutput,
247      Class<? extends OutputFormat> outputFormatClass,
248      Class<?> keyClass, Class<?> valueClass) {
249    checkNamedOutputName(job, namedOutput, true);
250    Configuration conf = job.getConfiguration();
251    conf.set(MULTIPLE_OUTPUTS,
252      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
253    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
254      OutputFormat.class);
255    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
256    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
257  }
258
259  /**
260   * Enables or disables counters for the named outputs.
261   * 
262   * The counters group is the {@link MultipleOutputs} class name.
263   * The names of the counters are the same as the named outputs. These
264   * counters count the number records written to each output name.
265   * By default these counters are disabled.
266   *
267   * @param job    job  to enable counters
268   * @param enabled indicates if the counters will be enabled or not.
269   */
270  public static void setCountersEnabled(Job job, boolean enabled) {
271    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
272  }
273
274  /**
275   * Returns if the counters for the named outputs are enabled or not.
276   * By default these counters are disabled.
277   *
278   * @param job    the job 
279   * @return TRUE if the counters are enabled, FALSE if they are disabled.
280   */
281  public static boolean getCountersEnabled(JobContext job) {
282    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
283  }
284
285  /**
286   * Wraps RecordWriter to increment counters. 
287   */
288  @SuppressWarnings("unchecked")
289  private static class RecordWriterWithCounter extends RecordWriter {
290    private RecordWriter writer;
291    private String counterName;
292    private TaskInputOutputContext context;
293
294    public RecordWriterWithCounter(RecordWriter writer, String counterName,
295                                   TaskInputOutputContext context) {
296      this.writer = writer;
297      this.counterName = counterName;
298      this.context = context;
299    }
300
301    @SuppressWarnings({"unchecked"})
302    public void write(Object key, Object value) 
303        throws IOException, InterruptedException {
304      context.getCounter(COUNTERS_GROUP, counterName).increment(1);
305      writer.write(key, value);
306    }
307
308    public void close(TaskAttemptContext context) 
309        throws IOException, InterruptedException {
310      writer.close(context);
311    }
312  }
313
314  // instance code, to be used from Mapper/Reducer code
315
316  private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
317  private Set<String> namedOutputs;
318  private Map<String, RecordWriter<?, ?>> recordWriters;
319  private boolean countersEnabled;
320  
321  /**
322   * Creates and initializes multiple outputs support,
323   * it should be instantiated in the Mapper/Reducer setup method.
324   *
325   * @param context the TaskInputOutputContext object
326   */
327  public MultipleOutputs(
328      TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
329    this.context = context;
330    namedOutputs = Collections.unmodifiableSet(
331      new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
332    recordWriters = new HashMap<String, RecordWriter<?, ?>>();
333    countersEnabled = getCountersEnabled(context);
334  }
335
336  /**
337   * Write key and value to the namedOutput.
338   *
339   * Output path is a unique file generated for the namedOutput.
340   * For example, {namedOutput}-(m|r)-{part-number}
341   * 
342   * @param namedOutput the named output name
343   * @param key         the key
344   * @param value       the value
345   */
346  @SuppressWarnings("unchecked")
347  public <K, V> void write(String namedOutput, K key, V value)
348      throws IOException, InterruptedException {
349    write(namedOutput, key, value, namedOutput);
350  }
351
352  /**
353   * Write key and value to baseOutputPath using the namedOutput.
354   * 
355   * @param namedOutput    the named output name
356   * @param key            the key
357   * @param value          the value
358   * @param baseOutputPath base-output path to write the record to.
359   * Note: Framework will generate unique filename for the baseOutputPath
360   */
361  @SuppressWarnings("unchecked")
362  public <K, V> void write(String namedOutput, K key, V value,
363      String baseOutputPath) throws IOException, InterruptedException {
364    checkNamedOutputName(context, namedOutput, false);
365    checkBaseOutputPath(baseOutputPath);
366    if (!namedOutputs.contains(namedOutput)) {
367      throw new IllegalArgumentException("Undefined named output '" +
368        namedOutput + "'");
369    }
370    TaskAttemptContext taskContext = getContext(namedOutput);
371    getRecordWriter(taskContext, baseOutputPath).write(key, value);
372  }
373
374  /**
375   * Write key value to an output file name.
376   * 
377   * Gets the record writer from job's output format.  
378   * Job's output format should be a FileOutputFormat.
379   * 
380   * @param key       the key
381   * @param value     the value
382   * @param baseOutputPath base-output path to write the record to.
383   * Note: Framework will generate unique filename for the baseOutputPath
384   */
385  @SuppressWarnings("unchecked")
386  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
387      throws IOException, InterruptedException {
388    checkBaseOutputPath(baseOutputPath);
389    if (jobOutputFormatContext == null) {
390      jobOutputFormatContext = 
391        new TaskAttemptContextImpl(context.getConfiguration(), 
392                                   context.getTaskAttemptID(),
393                                   new WrappedStatusReporter(context));
394    }
395    getRecordWriter(jobOutputFormatContext, baseOutputPath).write(key, value);
396  }
397
398  // by being synchronized MultipleOutputTask can be use with a
399  // MultithreadedMapper.
400  @SuppressWarnings("unchecked")
401  private synchronized RecordWriter getRecordWriter(
402      TaskAttemptContext taskContext, String baseFileName) 
403      throws IOException, InterruptedException {
404    
405    // look for record-writer in the cache
406    RecordWriter writer = recordWriters.get(baseFileName);
407    
408    // If not in cache, create a new one
409    if (writer == null) {
410      // get the record writer from context output format
411      FileOutputFormat.setOutputName(taskContext, baseFileName);
412      try {
413        writer = ((OutputFormat) ReflectionUtils.newInstance(
414          taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
415          .getRecordWriter(taskContext);
416      } catch (ClassNotFoundException e) {
417        throw new IOException(e);
418      }
419 
420      // if counters are enabled, wrap the writer with context 
421      // to increment counters 
422      if (countersEnabled) {
423        writer = new RecordWriterWithCounter(writer, baseFileName, context);
424      }
425      
426      // add the record-writer to the cache
427      recordWriters.put(baseFileName, writer);
428    }
429    return writer;
430  }
431
432   // Create a taskAttemptContext for the named output with 
433   // output format and output key/value types put in the context
434  private TaskAttemptContext getContext(String nameOutput) throws IOException {
435      
436    TaskAttemptContext taskContext = taskContexts.get(nameOutput);
437    
438    if (taskContext != null) {
439        return taskContext;
440    }
441    
442    // The following trick leverages the instantiation of a record writer via
443    // the job thus supporting arbitrary output formats.
444    Job job = new Job(context.getConfiguration());
445    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
446    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
447    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
448    taskContext = new TaskAttemptContextImpl(job.getConfiguration(), context
449        .getTaskAttemptID(), new WrappedStatusReporter(context));
450
451    taskContexts.put(nameOutput, taskContext);
452
453    return taskContext;
454  }
455
456  private static class WrappedStatusReporter extends StatusReporter {
457
458    TaskAttemptContext context;
459
460    public WrappedStatusReporter(TaskAttemptContext context) {
461      this.context = context;
462    }
463
464    @Override
465    public Counter getCounter(Enum<?> name) {
466      return context.getCounter(name);
467    }
468
469    @Override
470    public Counter getCounter(String group, String name) {
471      return context.getCounter(group, name);
472    }
473
474    @Override
475    public void progress() {
476      context.progress();
477    }
478
479    @Override
480    public float getProgress() {
481      return context.getProgress();
482    }
483    
484    @Override
485    public void setStatus(String status) {
486      context.setStatus(status);
487    }
488  }
489
490  /**
491   * Closes all the opened outputs.
492   * 
493   * This should be called from cleanup method of map/reduce task.
494   * If overridden subclasses must invoke <code>super.close()</code> at the
495   * end of their <code>close()</code>
496   * 
497   */
498  @SuppressWarnings("unchecked")
499  public void close() throws IOException, InterruptedException {
500    for (RecordWriter writer : recordWriters.values()) {
501      writer.close(context);
502    }
503  }
504}