001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapred.lib;
019
020import org.apache.hadoop.classification.InterfaceAudience;
021import org.apache.hadoop.classification.InterfaceStability;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.mapred.*;
024import org.apache.hadoop.util.Progressable;
025
026import java.io.IOException;
027import java.util.*;
028
029/**
030 * The MultipleOutputs class simplifies writting to additional outputs other
031 * than the job default output via the <code>OutputCollector</code> passed to
032 * the <code>map()</code> and <code>reduce()</code> methods of the
033 * <code>Mapper</code> and <code>Reducer</code> implementations.
034 * <p/>
035 * Each additional output, or named output, may be configured with its own
036 * <code>OutputFormat</code>, with its own key class and with its own value
037 * class.
038 * <p/>
039 * A named output can be a single file or a multi file. The later is refered as
040 * a multi named output.
041 * <p/>
042 * A multi named output is an unbound set of files all sharing the same
043 * <code>OutputFormat</code>, key class and value class configuration.
044 * <p/>
045 * When named outputs are used within a <code>Mapper</code> implementation,
046 * key/values written to a name output are not part of the reduce phase, only
047 * key/values written to the job <code>OutputCollector</code> are part of the
048 * reduce phase.
049 * <p/>
050 * MultipleOutputs supports counters, by default the are disabled. The counters
051 * group is the {@link MultipleOutputs} class name.
052 * </p>
053 * The names of the counters are the same as the named outputs. For multi
054 * named outputs the name of the counter is the concatenation of the named
055 * output, and underscore '_' and the multiname.
056 * <p/>
057 * Job configuration usage pattern is:
058 * <pre>
059 *
060 * JobConf conf = new JobConf();
061 *
062 * conf.setInputPath(inDir);
063 * FileOutputFormat.setOutputPath(conf, outDir);
064 *
065 * conf.setMapperClass(MOMap.class);
066 * conf.setReducerClass(MOReduce.class);
067 * ...
068 *
069 * // Defines additional single text based output 'text' for the job
070 * MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
071 * LongWritable.class, Text.class);
072 *
073 * // Defines additional multi sequencefile based output 'sequence' for the
074 * // job
075 * MultipleOutputs.addMultiNamedOutput(conf, "seq",
076 *   SequenceFileOutputFormat.class,
077 *   LongWritable.class, Text.class);
078 * ...
079 *
080 * JobClient jc = new JobClient();
081 * RunningJob job = jc.submitJob(conf);
082 *
083 * ...
084 * </pre>
085 * <p/>
086 * Job configuration usage pattern is:
087 * <pre>
088 *
089 * public class MOReduce implements
090 *   Reducer&lt;WritableComparable, Writable&gt; {
091 * private MultipleOutputs mos;
092 *
093 * public void configure(JobConf conf) {
094 * ...
095 * mos = new MultipleOutputs(conf);
096 * }
097 *
098 * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
099 * OutputCollector output, Reporter reporter)
100 * throws IOException {
101 * ...
102 * mos.getCollector("text", reporter).collect(key, new Text("Hello"));
103 * mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye"));
104 * mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau"));
105 * ...
106 * }
107 *
108 * public void close() throws IOException {
109 * mos.close();
110 * ...
111 * }
112 *
113 * }
114 * </pre>
115 */
116@InterfaceAudience.Public
117@InterfaceStability.Stable
118public class MultipleOutputs {
119
120  private static final String NAMED_OUTPUTS = "mo.namedOutputs";
121
122  private static final String MO_PREFIX = "mo.namedOutput.";
123
124  private static final String FORMAT = ".format";
125  private static final String KEY = ".key";
126  private static final String VALUE = ".value";
127  private static final String MULTI = ".multi";
128
129  private static final String COUNTERS_ENABLED = "mo.counters";
130
131  /**
132   * Counters group used by the counters of MultipleOutputs.
133   */
134  private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
135
136  /**
137   * Checks if a named output is alreadyDefined or not.
138   *
139   * @param conf           job conf
140   * @param namedOutput    named output names
141   * @param alreadyDefined whether the existence/non-existence of
142   *                       the named output is to be checked
143   * @throws IllegalArgumentException if the output name is alreadyDefined or
144   *                                  not depending on the value of the
145   *                                  'alreadyDefined' parameter
146   */
147  private static void checkNamedOutput(JobConf conf, String namedOutput,
148                                       boolean alreadyDefined) {
149    List<String> definedChannels = getNamedOutputsList(conf);
150    if (alreadyDefined && definedChannels.contains(namedOutput)) {
151      throw new IllegalArgumentException("Named output '" + namedOutput +
152        "' already alreadyDefined");
153    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
154      throw new IllegalArgumentException("Named output '" + namedOutput +
155        "' not defined");
156    }
157  }
158
159  /**
160   * Checks if a named output name is valid token.
161   *
162   * @param namedOutput named output Name
163   * @throws IllegalArgumentException if the output name is not valid.
164   */
165  private static void checkTokenName(String namedOutput) {
166    if (namedOutput == null || namedOutput.length() == 0) {
167      throw new IllegalArgumentException(
168        "Name cannot be NULL or emtpy");
169    }
170    for (char ch : namedOutput.toCharArray()) {
171      if ((ch >= 'A') && (ch <= 'Z')) {
172        continue;
173      }
174      if ((ch >= 'a') && (ch <= 'z')) {
175        continue;
176      }
177      if ((ch >= '0') && (ch <= '9')) {
178        continue;
179      }
180      throw new IllegalArgumentException(
181        "Name cannot be have a '" + ch + "' char");
182    }
183  }
184
185  /**
186   * Checks if a named output name is valid.
187   *
188   * @param namedOutput named output Name
189   * @throws IllegalArgumentException if the output name is not valid.
190   */
191  private static void checkNamedOutputName(String namedOutput) {
192    checkTokenName(namedOutput);
193    // name cannot be the name used for the default output
194    if (namedOutput.equals("part")) {
195      throw new IllegalArgumentException(
196        "Named output name cannot be 'part'");
197    }
198  }
199
200  /**
201   * Returns list of channel names.
202   *
203   * @param conf job conf
204   * @return List of channel Names
205   */
206  public static List<String> getNamedOutputsList(JobConf conf) {
207    List<String> names = new ArrayList<String>();
208    StringTokenizer st = new StringTokenizer(conf.get(NAMED_OUTPUTS, ""), " ");
209    while (st.hasMoreTokens()) {
210      names.add(st.nextToken());
211    }
212    return names;
213  }
214
215
216  /**
217   * Returns if a named output is multiple.
218   *
219   * @param conf        job conf
220   * @param namedOutput named output
221   * @return <code>true</code> if the name output is multi, <code>false</code>
222   *         if it is single. If the name output is not defined it returns
223   *         <code>false</code>
224   */
225  public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) {
226    checkNamedOutput(conf, namedOutput, false);
227    return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false);
228  }
229
230  /**
231   * Returns the named output OutputFormat.
232   *
233   * @param conf        job conf
234   * @param namedOutput named output
235   * @return namedOutput OutputFormat
236   */
237  public static Class<? extends OutputFormat> getNamedOutputFormatClass(
238    JobConf conf, String namedOutput) {
239    checkNamedOutput(conf, namedOutput, false);
240    return conf.getClass(MO_PREFIX + namedOutput + FORMAT, null,
241      OutputFormat.class);
242  }
243
244  /**
245   * Returns the key class for a named output.
246   *
247   * @param conf        job conf
248   * @param namedOutput named output
249   * @return class for the named output key
250   */
251  public static Class<?> getNamedOutputKeyClass(JobConf conf,
252                                                String namedOutput) {
253    checkNamedOutput(conf, namedOutput, false);
254    return conf.getClass(MO_PREFIX + namedOutput + KEY, null,
255        Object.class);
256  }
257
258  /**
259   * Returns the value class for a named output.
260   *
261   * @param conf        job conf
262   * @param namedOutput named output
263   * @return class of named output value
264   */
265  public static Class<?> getNamedOutputValueClass(JobConf conf,
266                                                  String namedOutput) {
267    checkNamedOutput(conf, namedOutput, false);
268    return conf.getClass(MO_PREFIX + namedOutput + VALUE, null,
269      Object.class);
270  }
271
272  /**
273   * Adds a named output for the job.
274   * <p/>
275   *
276   * @param conf              job conf to add the named output
277   * @param namedOutput       named output name, it has to be a word, letters
278   *                          and numbers only, cannot be the word 'part' as
279   *                          that is reserved for the
280   *                          default output.
281   * @param outputFormatClass OutputFormat class.
282   * @param keyClass          key class
283   * @param valueClass        value class
284   */
285  public static void addNamedOutput(JobConf conf, String namedOutput,
286                                Class<? extends OutputFormat> outputFormatClass,
287                                Class<?> keyClass, Class<?> valueClass) {
288    addNamedOutput(conf, namedOutput, false, outputFormatClass, keyClass,
289      valueClass);
290  }
291
292  /**
293   * Adds a multi named output for the job.
294   * <p/>
295   *
296   * @param conf              job conf to add the named output
297   * @param namedOutput       named output name, it has to be a word, letters
298   *                          and numbers only, cannot be the word 'part' as
299   *                          that is reserved for the
300   *                          default output.
301   * @param outputFormatClass OutputFormat class.
302   * @param keyClass          key class
303   * @param valueClass        value class
304   */
305  public static void addMultiNamedOutput(JobConf conf, String namedOutput,
306                               Class<? extends OutputFormat> outputFormatClass,
307                               Class<?> keyClass, Class<?> valueClass) {
308    addNamedOutput(conf, namedOutput, true, outputFormatClass, keyClass,
309      valueClass);
310  }
311
312  /**
313   * Adds a named output for the job.
314   * <p/>
315   *
316   * @param conf              job conf to add the named output
317   * @param namedOutput       named output name, it has to be a word, letters
318   *                          and numbers only, cannot be the word 'part' as
319   *                          that is reserved for the
320   *                          default output.
321   * @param multi             indicates if the named output is multi
322   * @param outputFormatClass OutputFormat class.
323   * @param keyClass          key class
324   * @param valueClass        value class
325   */
326  private static void addNamedOutput(JobConf conf, String namedOutput,
327                               boolean multi,
328                               Class<? extends OutputFormat> outputFormatClass,
329                               Class<?> keyClass, Class<?> valueClass) {
330    checkNamedOutputName(namedOutput);
331    checkNamedOutput(conf, namedOutput, true);
332    conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput);
333    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
334      OutputFormat.class);
335    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
336    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
337    conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi);
338  }
339
340  /**
341   * Enables or disables counters for the named outputs.
342   * <p/>
343   * By default these counters are disabled.
344   * <p/>
345   * MultipleOutputs supports counters, by default the are disabled.
346   * The counters group is the {@link MultipleOutputs} class name.
347   * </p>
348   * The names of the counters are the same as the named outputs. For multi
349   * named outputs the name of the counter is the concatenation of the named
350   * output, and underscore '_' and the multiname.
351   *
352   * @param conf    job conf to enableadd the named output.
353   * @param enabled indicates if the counters will be enabled or not.
354   */
355  public static void setCountersEnabled(JobConf conf, boolean enabled) {
356    conf.setBoolean(COUNTERS_ENABLED, enabled);
357  }
358
359  /**
360   * Returns if the counters for the named outputs are enabled or not.
361   * <p/>
362   * By default these counters are disabled.
363   * <p/>
364   * MultipleOutputs supports counters, by default the are disabled.
365   * The counters group is the {@link MultipleOutputs} class name.
366   * </p>
367   * The names of the counters are the same as the named outputs. For multi
368   * named outputs the name of the counter is the concatenation of the named
369   * output, and underscore '_' and the multiname.
370   *
371   *
372   * @param conf    job conf to enableadd the named output.
373   * @return TRUE if the counters are enabled, FALSE if they are disabled.
374   */
375  public static boolean getCountersEnabled(JobConf conf) {
376    return conf.getBoolean(COUNTERS_ENABLED, false);
377  }
378
379  // instance code, to be used from Mapper/Reducer code
380
381  private JobConf conf;
382  private OutputFormat outputFormat;
383  private Set<String> namedOutputs;
384  private Map<String, RecordWriter> recordWriters;
385  private boolean countersEnabled;
386
387  /**
388   * Creates and initializes multiple named outputs support, it should be
389   * instantiated in the Mapper/Reducer configure method.
390   *
391   * @param job the job configuration object
392   */
393  public MultipleOutputs(JobConf job) {
394    this.conf = job;
395    outputFormat = new InternalFileOutputFormat();
396    namedOutputs = Collections.unmodifiableSet(
397      new HashSet<String>(MultipleOutputs.getNamedOutputsList(job)));
398    recordWriters = new HashMap<String, RecordWriter>();
399    countersEnabled = getCountersEnabled(job);
400  }
401
402  /**
403   * Returns iterator with the defined name outputs.
404   *
405   * @return iterator with the defined named outputs
406   */
407  public Iterator<String> getNamedOutputs() {
408    return namedOutputs.iterator();
409  }
410
411
412  // by being synchronized MultipleOutputTask can be use with a
413  // MultithreaderMapRunner.
414  private synchronized RecordWriter getRecordWriter(String namedOutput,
415                                                    String baseFileName,
416                                                    final Reporter reporter)
417    throws IOException {
418    RecordWriter writer = recordWriters.get(baseFileName);
419    if (writer == null) {
420      if (countersEnabled && reporter == null) {
421        throw new IllegalArgumentException(
422          "Counters are enabled, Reporter cannot be NULL");
423      }
424      JobConf jobConf = new JobConf(conf);
425      jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput);
426      FileSystem fs = FileSystem.get(conf);
427      writer =
428        outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter);
429
430      if (countersEnabled) {
431        if (reporter == null) {
432          throw new IllegalArgumentException(
433            "Counters are enabled, Reporter cannot be NULL");
434        }
435        writer = new RecordWriterWithCounter(writer, baseFileName, reporter);
436      }
437
438      recordWriters.put(baseFileName, writer);
439    }
440    return writer;
441  }
442
443  private static class RecordWriterWithCounter implements RecordWriter {
444    private RecordWriter writer;
445    private String counterName;
446    private Reporter reporter;
447
448    public RecordWriterWithCounter(RecordWriter writer, String counterName,
449                                   Reporter reporter) {
450      this.writer = writer;
451      this.counterName = counterName;
452      this.reporter = reporter;
453    }
454
455    @SuppressWarnings({"unchecked"})
456    public void write(Object key, Object value) throws IOException {
457      reporter.incrCounter(COUNTERS_GROUP, counterName, 1);
458      writer.write(key, value);
459    }
460
461    public void close(Reporter reporter) throws IOException {
462      writer.close(reporter);
463    }
464  }
465
466  /**
467   * Gets the output collector for a named output.
468   * <p/>
469   *
470   * @param namedOutput the named output name
471   * @param reporter    the reporter
472   * @return the output collector for the given named output
473   * @throws IOException thrown if output collector could not be created
474   */
475  @SuppressWarnings({"unchecked"})
476  public OutputCollector getCollector(String namedOutput, Reporter reporter)
477    throws IOException {
478    return getCollector(namedOutput, null, reporter);
479  }
480
481  /**
482   * Gets the output collector for a multi named output.
483   * <p/>
484   *
485   * @param namedOutput the named output name
486   * @param multiName   the multi name part
487   * @param reporter    the reporter
488   * @return the output collector for the given named output
489   * @throws IOException thrown if output collector could not be created
490   */
491  @SuppressWarnings({"unchecked"})
492  public OutputCollector getCollector(String namedOutput, String multiName,
493                                      Reporter reporter)
494    throws IOException {
495
496    checkNamedOutputName(namedOutput);
497    if (!namedOutputs.contains(namedOutput)) {
498      throw new IllegalArgumentException("Undefined named output '" +
499        namedOutput + "'");
500    }
501    boolean multi = isMultiNamedOutput(conf, namedOutput);
502
503    if (!multi && multiName != null) {
504      throw new IllegalArgumentException("Name output '" + namedOutput +
505        "' has not been defined as multi");
506    }
507    if (multi) {
508      checkTokenName(multiName);
509    }
510
511    String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput;
512
513    final RecordWriter writer =
514      getRecordWriter(namedOutput, baseFileName, reporter);
515
516    return new OutputCollector() {
517
518      @SuppressWarnings({"unchecked"})
519      public void collect(Object key, Object value) throws IOException {
520        writer.write(key, value);
521      }
522
523    };
524  }
525
526  /**
527   * Closes all the opened named outputs.
528   * <p/>
529   * If overriden subclasses must invoke <code>super.close()</code> at the
530   * end of their <code>close()</code>
531   *
532   * @throws java.io.IOException thrown if any of the MultipleOutput files
533   *                             could not be closed properly.
534   */
535  public void close() throws IOException {
536    for (RecordWriter writer : recordWriters.values()) {
537      writer.close(null);
538    }
539  }
540
541  private static class InternalFileOutputFormat extends
542    FileOutputFormat<Object, Object> {
543
544    public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput";
545
546    @SuppressWarnings({"unchecked"})
547    public RecordWriter<Object, Object> getRecordWriter(
548      FileSystem fs, JobConf job, String baseFileName, Progressable progress)
549      throws IOException {
550
551      String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
552      String fileName = getUniqueName(job, baseFileName);
553
554      // The following trick leverages the instantiation of a record writer via
555      // the job conf thus supporting arbitrary output formats.
556      JobConf outputConf = new JobConf(job);
557      outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
558      outputConf.setOutputKeyClass(getNamedOutputKeyClass(job, nameOutput));
559      outputConf.setOutputValueClass(getNamedOutputValueClass(job, nameOutput));
560      OutputFormat outputFormat = outputConf.getOutputFormat();
561      return outputFormat.getRecordWriter(fs, outputConf, fileName, progress);
562    }
563  }
564
565}