001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapred.lib;
019
020import org.apache.hadoop.classification.InterfaceAudience;
021import org.apache.hadoop.classification.InterfaceStability;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.mapred.*;
024import org.apache.hadoop.util.Progressable;
025
026import java.io.IOException;
027import java.util.*;
028
029/**
030 * The MultipleOutputs class simplifies writting to additional outputs other
031 * than the job default output via the <code>OutputCollector</code> passed to
032 * the <code>map()</code> and <code>reduce()</code> methods of the
033 * <code>Mapper</code> and <code>Reducer</code> implementations.
034 * <p>
035 * Each additional output, or named output, may be configured with its own
036 * <code>OutputFormat</code>, with its own key class and with its own value
037 * class.
038 * <p>
039 * A named output can be a single file or a multi file. The later is refered as
040 * a multi named output.
041 * <p>
042 * A multi named output is an unbound set of files all sharing the same
043 * <code>OutputFormat</code>, key class and value class configuration.
044 * <p>
045 * When named outputs are used within a <code>Mapper</code> implementation,
046 * key/values written to a name output are not part of the reduce phase, only
047 * key/values written to the job <code>OutputCollector</code> are part of the
048 * reduce phase.
049 * <p>
050 * MultipleOutputs supports counters, by default the are disabled. The counters
051 * group is the {@link MultipleOutputs} class name.
052 * </p>
053 * The names of the counters are the same as the named outputs. For multi
054 * named outputs the name of the counter is the concatenation of the named
055 * output, and underscore '_' and the multiname.
056 * <p>
057 * Job configuration usage pattern is:
058 * <pre>
059 *
060 * JobConf conf = new JobConf();
061 *
062 * conf.setInputPath(inDir);
063 * FileOutputFormat.setOutputPath(conf, outDir);
064 *
065 * conf.setMapperClass(MOMap.class);
066 * conf.setReducerClass(MOReduce.class);
067 * ...
068 *
069 * // Defines additional single text based output 'text' for the job
070 * MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
071 * LongWritable.class, Text.class);
072 *
073 * // Defines additional multi sequencefile based output 'sequence' for the
074 * // job
075 * MultipleOutputs.addMultiNamedOutput(conf, "seq",
076 *   SequenceFileOutputFormat.class,
077 *   LongWritable.class, Text.class);
078 * ...
079 *
080 * JobClient jc = new JobClient();
081 * RunningJob job = jc.submitJob(conf);
082 *
083 * ...
084 * </pre>
085 * <p>
086 * Job configuration usage pattern is:
087 * <pre>
088 *
089 * public class MOReduce implements
090 *   Reducer&lt;WritableComparable, Writable&gt; {
091 * private MultipleOutputs mos;
092 *
093 * public void configure(JobConf conf) {
094 * ...
095 * mos = new MultipleOutputs(conf);
096 * }
097 *
098 * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
099 * OutputCollector output, Reporter reporter)
100 * throws IOException {
101 * ...
102 * mos.getCollector("text", reporter).collect(key, new Text("Hello"));
103 * mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye"));
104 * mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau"));
105 * ...
106 * }
107 *
108 * public void close() throws IOException {
109 * mos.close();
110 * ...
111 * }
112 *
113 * }
114 * </pre>
115 */
116@InterfaceAudience.Public
117@InterfaceStability.Stable
118public class MultipleOutputs {
119
120  private static final String NAMED_OUTPUTS = "mo.namedOutputs";
121
122  private static final String MO_PREFIX = "mo.namedOutput.";
123
124  private static final String FORMAT = ".format";
125  private static final String KEY = ".key";
126  private static final String VALUE = ".value";
127  private static final String MULTI = ".multi";
128
129  private static final String COUNTERS_ENABLED = "mo.counters";
130
131  /**
132   * Counters group used by the counters of MultipleOutputs.
133   */
134  private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
135
136  /**
137   * Checks if a named output is alreadyDefined or not.
138   *
139   * @param conf           job conf
140   * @param namedOutput    named output names
141   * @param alreadyDefined whether the existence/non-existence of
142   *                       the named output is to be checked
143   * @throws IllegalArgumentException if the output name is alreadyDefined or
144   *                                  not depending on the value of the
145   *                                  'alreadyDefined' parameter
146   */
147  private static void checkNamedOutput(JobConf conf, String namedOutput,
148                                       boolean alreadyDefined) {
149    List<String> definedChannels = getNamedOutputsList(conf);
150    if (alreadyDefined && definedChannels.contains(namedOutput)) {
151      throw new IllegalArgumentException("Named output '" + namedOutput +
152        "' already alreadyDefined");
153    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
154      throw new IllegalArgumentException("Named output '" + namedOutput +
155        "' not defined");
156    }
157  }
158
159  /**
160   * Checks if a named output name is valid token.
161   *
162   * @param namedOutput named output Name
163   * @throws IllegalArgumentException if the output name is not valid.
164   */
165  private static void checkTokenName(String namedOutput) {
166    if (namedOutput == null || namedOutput.length() == 0) {
167      throw new IllegalArgumentException(
168        "Name cannot be NULL or emtpy");
169    }
170    for (char ch : namedOutput.toCharArray()) {
171      if ((ch >= 'A') && (ch <= 'Z')) {
172        continue;
173      }
174      if ((ch >= 'a') && (ch <= 'z')) {
175        continue;
176      }
177      if ((ch >= '0') && (ch <= '9')) {
178        continue;
179      }
180      throw new IllegalArgumentException(
181        "Name cannot be have a '" + ch + "' char");
182    }
183  }
184
185  /**
186   * Checks if a named output name is valid.
187   *
188   * @param namedOutput named output Name
189   * @throws IllegalArgumentException if the output name is not valid.
190   */
191  private static void checkNamedOutputName(String namedOutput) {
192    checkTokenName(namedOutput);
193    // name cannot be the name used for the default output
194    if (namedOutput.equals("part")) {
195      throw new IllegalArgumentException(
196        "Named output name cannot be 'part'");
197    }
198  }
199
200  /**
201   * Returns list of channel names.
202   *
203   * @param conf job conf
204   * @return List of channel Names
205   */
206  public static List<String> getNamedOutputsList(JobConf conf) {
207    List<String> names = new ArrayList<String>();
208    StringTokenizer st = new StringTokenizer(conf.get(NAMED_OUTPUTS, ""), " ");
209    while (st.hasMoreTokens()) {
210      names.add(st.nextToken());
211    }
212    return names;
213  }
214
215
216  /**
217   * Returns if a named output is multiple.
218   *
219   * @param conf        job conf
220   * @param namedOutput named output
221   * @return <code>true</code> if the name output is multi, <code>false</code>
222   *         if it is single. If the name output is not defined it returns
223   *         <code>false</code>
224   */
225  public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) {
226    checkNamedOutput(conf, namedOutput, false);
227    return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false);
228  }
229
230  /**
231   * Returns the named output OutputFormat.
232   *
233   * @param conf        job conf
234   * @param namedOutput named output
235   * @return namedOutput OutputFormat
236   */
237  public static Class<? extends OutputFormat> getNamedOutputFormatClass(
238    JobConf conf, String namedOutput) {
239    checkNamedOutput(conf, namedOutput, false);
240    return conf.getClass(MO_PREFIX + namedOutput + FORMAT, null,
241      OutputFormat.class);
242  }
243
244  /**
245   * Returns the key class for a named output.
246   *
247   * @param conf        job conf
248   * @param namedOutput named output
249   * @return class for the named output key
250   */
251  public static Class<?> getNamedOutputKeyClass(JobConf conf,
252                                                String namedOutput) {
253    checkNamedOutput(conf, namedOutput, false);
254    return conf.getClass(MO_PREFIX + namedOutput + KEY, null,
255        Object.class);
256  }
257
258  /**
259   * Returns the value class for a named output.
260   *
261   * @param conf        job conf
262   * @param namedOutput named output
263   * @return class of named output value
264   */
265  public static Class<?> getNamedOutputValueClass(JobConf conf,
266                                                  String namedOutput) {
267    checkNamedOutput(conf, namedOutput, false);
268    return conf.getClass(MO_PREFIX + namedOutput + VALUE, null,
269      Object.class);
270  }
271
272  /**
273   * Adds a named output for the job.
274   *
275   * @param conf              job conf to add the named output
276   * @param namedOutput       named output name, it has to be a word, letters
277   *                          and numbers only, cannot be the word 'part' as
278   *                          that is reserved for the
279   *                          default output.
280   * @param outputFormatClass OutputFormat class.
281   * @param keyClass          key class
282   * @param valueClass        value class
283   */
284  public static void addNamedOutput(JobConf conf, String namedOutput,
285                                Class<? extends OutputFormat> outputFormatClass,
286                                Class<?> keyClass, Class<?> valueClass) {
287    addNamedOutput(conf, namedOutput, false, outputFormatClass, keyClass,
288      valueClass);
289  }
290
291  /**
292   * Adds a multi named output for the job.
293   *
294   * @param conf              job conf to add the named output
295   * @param namedOutput       named output name, it has to be a word, letters
296   *                          and numbers only, cannot be the word 'part' as
297   *                          that is reserved for the
298   *                          default output.
299   * @param outputFormatClass OutputFormat class.
300   * @param keyClass          key class
301   * @param valueClass        value class
302   */
303  public static void addMultiNamedOutput(JobConf conf, String namedOutput,
304                               Class<? extends OutputFormat> outputFormatClass,
305                               Class<?> keyClass, Class<?> valueClass) {
306    addNamedOutput(conf, namedOutput, true, outputFormatClass, keyClass,
307      valueClass);
308  }
309
310  /**
311   * Adds a named output for the job.
312   *
313   * @param conf              job conf to add the named output
314   * @param namedOutput       named output name, it has to be a word, letters
315   *                          and numbers only, cannot be the word 'part' as
316   *                          that is reserved for the
317   *                          default output.
318   * @param multi             indicates if the named output is multi
319   * @param outputFormatClass OutputFormat class.
320   * @param keyClass          key class
321   * @param valueClass        value class
322   */
323  private static void addNamedOutput(JobConf conf, String namedOutput,
324                               boolean multi,
325                               Class<? extends OutputFormat> outputFormatClass,
326                               Class<?> keyClass, Class<?> valueClass) {
327    checkNamedOutputName(namedOutput);
328    checkNamedOutput(conf, namedOutput, true);
329    conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput);
330    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
331      OutputFormat.class);
332    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
333    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
334    conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi);
335  }
336
337  /**
338   * Enables or disables counters for the named outputs.
339   * <p>
340   * By default these counters are disabled.
341   * <p>
342   * MultipleOutputs supports counters, by default the are disabled.
343   * The counters group is the {@link MultipleOutputs} class name.
344   * </p>
345   * The names of the counters are the same as the named outputs. For multi
346   * named outputs the name of the counter is the concatenation of the named
347   * output, and underscore '_' and the multiname.
348   *
349   * @param conf    job conf to enableadd the named output.
350   * @param enabled indicates if the counters will be enabled or not.
351   */
352  public static void setCountersEnabled(JobConf conf, boolean enabled) {
353    conf.setBoolean(COUNTERS_ENABLED, enabled);
354  }
355
356  /**
357   * Returns if the counters for the named outputs are enabled or not.
358   * <p>
359   * By default these counters are disabled.
360   * <p>
361   * MultipleOutputs supports counters, by default the are disabled.
362   * The counters group is the {@link MultipleOutputs} class name.
363   * </p>
364   * The names of the counters are the same as the named outputs. For multi
365   * named outputs the name of the counter is the concatenation of the named
366   * output, and underscore '_' and the multiname.
367   *
368   *
369   * @param conf    job conf to enableadd the named output.
370   * @return TRUE if the counters are enabled, FALSE if they are disabled.
371   */
372  public static boolean getCountersEnabled(JobConf conf) {
373    return conf.getBoolean(COUNTERS_ENABLED, false);
374  }
375
376  // instance code, to be used from Mapper/Reducer code
377
378  private JobConf conf;
379  private OutputFormat outputFormat;
380  private Set<String> namedOutputs;
381  private Map<String, RecordWriter> recordWriters;
382  private boolean countersEnabled;
383
384  /**
385   * Creates and initializes multiple named outputs support, it should be
386   * instantiated in the Mapper/Reducer configure method.
387   *
388   * @param job the job configuration object
389   */
390  public MultipleOutputs(JobConf job) {
391    this.conf = job;
392    outputFormat = new InternalFileOutputFormat();
393    namedOutputs = Collections.unmodifiableSet(
394      new HashSet<String>(MultipleOutputs.getNamedOutputsList(job)));
395    recordWriters = new HashMap<String, RecordWriter>();
396    countersEnabled = getCountersEnabled(job);
397  }
398
399  /**
400   * Returns iterator with the defined name outputs.
401   *
402   * @return iterator with the defined named outputs
403   */
404  public Iterator<String> getNamedOutputs() {
405    return namedOutputs.iterator();
406  }
407
408
409  // by being synchronized MultipleOutputTask can be use with a
410  // MultithreaderMapRunner.
411  private synchronized RecordWriter getRecordWriter(String namedOutput,
412                                                    String baseFileName,
413                                                    final Reporter reporter)
414    throws IOException {
415    RecordWriter writer = recordWriters.get(baseFileName);
416    if (writer == null) {
417      if (countersEnabled && reporter == null) {
418        throw new IllegalArgumentException(
419          "Counters are enabled, Reporter cannot be NULL");
420      }
421      JobConf jobConf = new JobConf(conf);
422      jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput);
423      FileSystem fs = FileSystem.get(conf);
424      writer =
425        outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter);
426
427      if (countersEnabled) {
428        if (reporter == null) {
429          throw new IllegalArgumentException(
430            "Counters are enabled, Reporter cannot be NULL");
431        }
432        writer = new RecordWriterWithCounter(writer, baseFileName, reporter);
433      }
434
435      recordWriters.put(baseFileName, writer);
436    }
437    return writer;
438  }
439
440  private static class RecordWriterWithCounter implements RecordWriter {
441    private RecordWriter writer;
442    private String counterName;
443    private Reporter reporter;
444
445    public RecordWriterWithCounter(RecordWriter writer, String counterName,
446                                   Reporter reporter) {
447      this.writer = writer;
448      this.counterName = counterName;
449      this.reporter = reporter;
450    }
451
452    @SuppressWarnings({"unchecked"})
453    public void write(Object key, Object value) throws IOException {
454      reporter.incrCounter(COUNTERS_GROUP, counterName, 1);
455      writer.write(key, value);
456    }
457
458    public void close(Reporter reporter) throws IOException {
459      writer.close(reporter);
460    }
461  }
462
463  /**
464   * Gets the output collector for a named output.
465   *
466   * @param namedOutput the named output name
467   * @param reporter    the reporter
468   * @return the output collector for the given named output
469   * @throws IOException thrown if output collector could not be created
470   */
471  @SuppressWarnings({"unchecked"})
472  public OutputCollector getCollector(String namedOutput, Reporter reporter)
473    throws IOException {
474    return getCollector(namedOutput, null, reporter);
475  }
476
477  /**
478   * Gets the output collector for a multi named output.
479   *
480   * @param namedOutput the named output name
481   * @param multiName   the multi name part
482   * @param reporter    the reporter
483   * @return the output collector for the given named output
484   * @throws IOException thrown if output collector could not be created
485   */
486  @SuppressWarnings({"unchecked"})
487  public OutputCollector getCollector(String namedOutput, String multiName,
488                                      Reporter reporter)
489    throws IOException {
490
491    checkNamedOutputName(namedOutput);
492    if (!namedOutputs.contains(namedOutput)) {
493      throw new IllegalArgumentException("Undefined named output '" +
494        namedOutput + "'");
495    }
496    boolean multi = isMultiNamedOutput(conf, namedOutput);
497
498    if (!multi && multiName != null) {
499      throw new IllegalArgumentException("Name output '" + namedOutput +
500        "' has not been defined as multi");
501    }
502    if (multi) {
503      checkTokenName(multiName);
504    }
505
506    String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput;
507
508    final RecordWriter writer =
509      getRecordWriter(namedOutput, baseFileName, reporter);
510
511    return new OutputCollector() {
512
513      @SuppressWarnings({"unchecked"})
514      public void collect(Object key, Object value) throws IOException {
515        writer.write(key, value);
516      }
517
518    };
519  }
520
521  /**
522   * Closes all the opened named outputs.
523   * <p>
524   * If overriden subclasses must invoke <code>super.close()</code> at the
525   * end of their <code>close()</code>
526   *
527   * @throws java.io.IOException thrown if any of the MultipleOutput files
528   *                             could not be closed properly.
529   */
530  public void close() throws IOException {
531    for (RecordWriter writer : recordWriters.values()) {
532      writer.close(null);
533    }
534  }
535
536  private static class InternalFileOutputFormat extends
537    FileOutputFormat<Object, Object> {
538
539    public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput";
540
541    @SuppressWarnings({"unchecked"})
542    public RecordWriter<Object, Object> getRecordWriter(
543      FileSystem fs, JobConf job, String baseFileName, Progressable progress)
544      throws IOException {
545
546      String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
547      String fileName = getUniqueName(job, baseFileName);
548
549      // The following trick leverages the instantiation of a record writer via
550      // the job conf thus supporting arbitrary output formats.
551      JobConf outputConf = new JobConf(job);
552      outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
553      outputConf.setOutputKeyClass(getNamedOutputKeyClass(job, nameOutput));
554      outputConf.setOutputValueClass(getNamedOutputValueClass(job, nameOutput));
555      OutputFormat outputFormat = outputConf.getOutputFormat();
556      return outputFormat.getRecordWriter(fs, outputConf, fileName, progress);
557    }
558  }
559
560}