001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapred.lib;
019    
020    import org.apache.hadoop.classification.InterfaceAudience;
021    import org.apache.hadoop.classification.InterfaceStability;
022    import org.apache.hadoop.fs.FileSystem;
023    import org.apache.hadoop.mapred.*;
024    import org.apache.hadoop.util.Progressable;
025    
026    import java.io.IOException;
027    import java.util.*;
028    
029    /**
030     * The MultipleOutputs class simplifies writting to additional outputs other
031     * than the job default output via the <code>OutputCollector</code> passed to
032     * the <code>map()</code> and <code>reduce()</code> methods of the
033     * <code>Mapper</code> and <code>Reducer</code> implementations.
034     * <p/>
035     * Each additional output, or named output, may be configured with its own
036     * <code>OutputFormat</code>, with its own key class and with its own value
037     * class.
038     * <p/>
039     * A named output can be a single file or a multi file. The later is refered as
040     * a multi named output.
041     * <p/>
042     * A multi named output is an unbound set of files all sharing the same
043     * <code>OutputFormat</code>, key class and value class configuration.
044     * <p/>
045     * When named outputs are used within a <code>Mapper</code> implementation,
046     * key/values written to a name output are not part of the reduce phase, only
047     * key/values written to the job <code>OutputCollector</code> are part of the
048     * reduce phase.
049     * <p/>
050     * MultipleOutputs supports counters, by default the are disabled. The counters
051     * group is the {@link MultipleOutputs} class name.
052     * </p>
053     * The names of the counters are the same as the named outputs. For multi
054     * named outputs the name of the counter is the concatenation of the named
055     * output, and underscore '_' and the multiname.
056     * <p/>
057     * Job configuration usage pattern is:
058     * <pre>
059     *
060     * JobConf conf = new JobConf();
061     *
062     * conf.setInputPath(inDir);
063     * FileOutputFormat.setOutputPath(conf, outDir);
064     *
065     * conf.setMapperClass(MOMap.class);
066     * conf.setReducerClass(MOReduce.class);
067     * ...
068     *
069     * // Defines additional single text based output 'text' for the job
070     * MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
071     * LongWritable.class, Text.class);
072     *
073     * // Defines additional multi sequencefile based output 'sequence' for the
074     * // job
075     * MultipleOutputs.addMultiNamedOutput(conf, "seq",
076     *   SequenceFileOutputFormat.class,
077     *   LongWritable.class, Text.class);
078     * ...
079     *
080     * JobClient jc = new JobClient();
081     * RunningJob job = jc.submitJob(conf);
082     *
083     * ...
084     * </pre>
085     * <p/>
086     * Job configuration usage pattern is:
087     * <pre>
088     *
089     * public class MOReduce implements
090     *   Reducer&lt;WritableComparable, Writable&gt; {
091     * private MultipleOutputs mos;
092     *
093     * public void configure(JobConf conf) {
094     * ...
095     * mos = new MultipleOutputs(conf);
096     * }
097     *
098     * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
099     * OutputCollector output, Reporter reporter)
100     * throws IOException {
101     * ...
102     * mos.getCollector("text", reporter).collect(key, new Text("Hello"));
103     * mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye"));
104     * mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau"));
105     * ...
106     * }
107     *
108     * public void close() throws IOException {
109     * mos.close();
110     * ...
111     * }
112     *
113     * }
114     * </pre>
115     */
116    @InterfaceAudience.Public
117    @InterfaceStability.Stable
118    public class MultipleOutputs {
119    
120      private static final String NAMED_OUTPUTS = "mo.namedOutputs";
121    
122      private static final String MO_PREFIX = "mo.namedOutput.";
123    
124      private static final String FORMAT = ".format";
125      private static final String KEY = ".key";
126      private static final String VALUE = ".value";
127      private static final String MULTI = ".multi";
128    
129      private static final String COUNTERS_ENABLED = "mo.counters";
130    
131      /**
132       * Counters group used by the counters of MultipleOutputs.
133       */
134      private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
135    
136      /**
137       * Checks if a named output is alreadyDefined or not.
138       *
139       * @param conf           job conf
140       * @param namedOutput    named output names
141       * @param alreadyDefined whether the existence/non-existence of
142       *                       the named output is to be checked
143       * @throws IllegalArgumentException if the output name is alreadyDefined or
144       *                                  not depending on the value of the
145       *                                  'alreadyDefined' parameter
146       */
147      private static void checkNamedOutput(JobConf conf, String namedOutput,
148                                           boolean alreadyDefined) {
149        List<String> definedChannels = getNamedOutputsList(conf);
150        if (alreadyDefined && definedChannels.contains(namedOutput)) {
151          throw new IllegalArgumentException("Named output '" + namedOutput +
152            "' already alreadyDefined");
153        } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
154          throw new IllegalArgumentException("Named output '" + namedOutput +
155            "' not defined");
156        }
157      }
158    
159      /**
160       * Checks if a named output name is valid token.
161       *
162       * @param namedOutput named output Name
163       * @throws IllegalArgumentException if the output name is not valid.
164       */
165      private static void checkTokenName(String namedOutput) {
166        if (namedOutput == null || namedOutput.length() == 0) {
167          throw new IllegalArgumentException(
168            "Name cannot be NULL or emtpy");
169        }
170        for (char ch : namedOutput.toCharArray()) {
171          if ((ch >= 'A') && (ch <= 'Z')) {
172            continue;
173          }
174          if ((ch >= 'a') && (ch <= 'z')) {
175            continue;
176          }
177          if ((ch >= '0') && (ch <= '9')) {
178            continue;
179          }
180          throw new IllegalArgumentException(
181            "Name cannot be have a '" + ch + "' char");
182        }
183      }
184    
185      /**
186       * Checks if a named output name is valid.
187       *
188       * @param namedOutput named output Name
189       * @throws IllegalArgumentException if the output name is not valid.
190       */
191      private static void checkNamedOutputName(String namedOutput) {
192        checkTokenName(namedOutput);
193        // name cannot be the name used for the default output
194        if (namedOutput.equals("part")) {
195          throw new IllegalArgumentException(
196            "Named output name cannot be 'part'");
197        }
198      }
199    
200      /**
201       * Returns list of channel names.
202       *
203       * @param conf job conf
204       * @return List of channel Names
205       */
206      public static List<String> getNamedOutputsList(JobConf conf) {
207        List<String> names = new ArrayList<String>();
208        StringTokenizer st = new StringTokenizer(conf.get(NAMED_OUTPUTS, ""), " ");
209        while (st.hasMoreTokens()) {
210          names.add(st.nextToken());
211        }
212        return names;
213      }
214    
215    
216      /**
217       * Returns if a named output is multiple.
218       *
219       * @param conf        job conf
220       * @param namedOutput named output
221       * @return <code>true</code> if the name output is multi, <code>false</code>
222       *         if it is single. If the name output is not defined it returns
223       *         <code>false</code>
224       */
225      public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) {
226        checkNamedOutput(conf, namedOutput, false);
227        return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false);
228      }
229    
230      /**
231       * Returns the named output OutputFormat.
232       *
233       * @param conf        job conf
234       * @param namedOutput named output
235       * @return namedOutput OutputFormat
236       */
237      public static Class<? extends OutputFormat> getNamedOutputFormatClass(
238        JobConf conf, String namedOutput) {
239        checkNamedOutput(conf, namedOutput, false);
240        return conf.getClass(MO_PREFIX + namedOutput + FORMAT, null,
241          OutputFormat.class);
242      }
243    
244      /**
245       * Returns the key class for a named output.
246       *
247       * @param conf        job conf
248       * @param namedOutput named output
249       * @return class for the named output key
250       */
251      public static Class<?> getNamedOutputKeyClass(JobConf conf,
252                                                    String namedOutput) {
253        checkNamedOutput(conf, namedOutput, false);
254        return conf.getClass(MO_PREFIX + namedOutput + KEY, null,
255            Object.class);
256      }
257    
258      /**
259       * Returns the value class for a named output.
260       *
261       * @param conf        job conf
262       * @param namedOutput named output
263       * @return class of named output value
264       */
265      public static Class<?> getNamedOutputValueClass(JobConf conf,
266                                                      String namedOutput) {
267        checkNamedOutput(conf, namedOutput, false);
268        return conf.getClass(MO_PREFIX + namedOutput + VALUE, null,
269          Object.class);
270      }
271    
272      /**
273       * Adds a named output for the job.
274       * <p/>
275       *
276       * @param conf              job conf to add the named output
277       * @param namedOutput       named output name, it has to be a word, letters
278       *                          and numbers only, cannot be the word 'part' as
279       *                          that is reserved for the
280       *                          default output.
281       * @param outputFormatClass OutputFormat class.
282       * @param keyClass          key class
283       * @param valueClass        value class
284       */
285      public static void addNamedOutput(JobConf conf, String namedOutput,
286                                    Class<? extends OutputFormat> outputFormatClass,
287                                    Class<?> keyClass, Class<?> valueClass) {
288        addNamedOutput(conf, namedOutput, false, outputFormatClass, keyClass,
289          valueClass);
290      }
291    
292      /**
293       * Adds a multi named output for the job.
294       * <p/>
295       *
296       * @param conf              job conf to add the named output
297       * @param namedOutput       named output name, it has to be a word, letters
298       *                          and numbers only, cannot be the word 'part' as
299       *                          that is reserved for the
300       *                          default output.
301       * @param outputFormatClass OutputFormat class.
302       * @param keyClass          key class
303       * @param valueClass        value class
304       */
305      public static void addMultiNamedOutput(JobConf conf, String namedOutput,
306                                   Class<? extends OutputFormat> outputFormatClass,
307                                   Class<?> keyClass, Class<?> valueClass) {
308        addNamedOutput(conf, namedOutput, true, outputFormatClass, keyClass,
309          valueClass);
310      }
311    
312      /**
313       * Adds a named output for the job.
314       * <p/>
315       *
316       * @param conf              job conf to add the named output
317       * @param namedOutput       named output name, it has to be a word, letters
318       *                          and numbers only, cannot be the word 'part' as
319       *                          that is reserved for the
320       *                          default output.
321       * @param multi             indicates if the named output is multi
322       * @param outputFormatClass OutputFormat class.
323       * @param keyClass          key class
324       * @param valueClass        value class
325       */
326      private static void addNamedOutput(JobConf conf, String namedOutput,
327                                   boolean multi,
328                                   Class<? extends OutputFormat> outputFormatClass,
329                                   Class<?> keyClass, Class<?> valueClass) {
330        checkNamedOutputName(namedOutput);
331        checkNamedOutput(conf, namedOutput, true);
332        conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput);
333        conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
334          OutputFormat.class);
335        conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
336        conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
337        conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi);
338      }
339    
340      /**
341       * Enables or disables counters for the named outputs.
342       * <p/>
343       * By default these counters are disabled.
344       * <p/>
345       * MultipleOutputs supports counters, by default the are disabled.
346       * The counters group is the {@link MultipleOutputs} class name.
347       * </p>
348       * The names of the counters are the same as the named outputs. For multi
349       * named outputs the name of the counter is the concatenation of the named
350       * output, and underscore '_' and the multiname.
351       *
352       * @param conf    job conf to enableadd the named output.
353       * @param enabled indicates if the counters will be enabled or not.
354       */
355      public static void setCountersEnabled(JobConf conf, boolean enabled) {
356        conf.setBoolean(COUNTERS_ENABLED, enabled);
357      }
358    
359      /**
360       * Returns if the counters for the named outputs are enabled or not.
361       * <p/>
362       * By default these counters are disabled.
363       * <p/>
364       * MultipleOutputs supports counters, by default the are disabled.
365       * The counters group is the {@link MultipleOutputs} class name.
366       * </p>
367       * The names of the counters are the same as the named outputs. For multi
368       * named outputs the name of the counter is the concatenation of the named
369       * output, and underscore '_' and the multiname.
370       *
371       *
372       * @param conf    job conf to enableadd the named output.
373       * @return TRUE if the counters are enabled, FALSE if they are disabled.
374       */
375      public static boolean getCountersEnabled(JobConf conf) {
376        return conf.getBoolean(COUNTERS_ENABLED, false);
377      }
378    
379      // instance code, to be used from Mapper/Reducer code
380    
381      private JobConf conf;
382      private OutputFormat outputFormat;
383      private Set<String> namedOutputs;
384      private Map<String, RecordWriter> recordWriters;
385      private boolean countersEnabled;
386    
387      /**
388       * Creates and initializes multiple named outputs support, it should be
389       * instantiated in the Mapper/Reducer configure method.
390       *
391       * @param job the job configuration object
392       */
393      public MultipleOutputs(JobConf job) {
394        this.conf = job;
395        outputFormat = new InternalFileOutputFormat();
396        namedOutputs = Collections.unmodifiableSet(
397          new HashSet<String>(MultipleOutputs.getNamedOutputsList(job)));
398        recordWriters = new HashMap<String, RecordWriter>();
399        countersEnabled = getCountersEnabled(job);
400      }
401    
402      /**
403       * Returns iterator with the defined name outputs.
404       *
405       * @return iterator with the defined named outputs
406       */
407      public Iterator<String> getNamedOutputs() {
408        return namedOutputs.iterator();
409      }
410    
411    
412      // by being synchronized MultipleOutputTask can be use with a
413      // MultithreaderMapRunner.
414      private synchronized RecordWriter getRecordWriter(String namedOutput,
415                                                        String baseFileName,
416                                                        final Reporter reporter)
417        throws IOException {
418        RecordWriter writer = recordWriters.get(baseFileName);
419        if (writer == null) {
420          if (countersEnabled && reporter == null) {
421            throw new IllegalArgumentException(
422              "Counters are enabled, Reporter cannot be NULL");
423          }
424          JobConf jobConf = new JobConf(conf);
425          jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput);
426          FileSystem fs = FileSystem.get(conf);
427          writer =
428            outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter);
429    
430          if (countersEnabled) {
431            if (reporter == null) {
432              throw new IllegalArgumentException(
433                "Counters are enabled, Reporter cannot be NULL");
434            }
435            writer = new RecordWriterWithCounter(writer, baseFileName, reporter);
436          }
437    
438          recordWriters.put(baseFileName, writer);
439        }
440        return writer;
441      }
442    
443      private static class RecordWriterWithCounter implements RecordWriter {
444        private RecordWriter writer;
445        private String counterName;
446        private Reporter reporter;
447    
448        public RecordWriterWithCounter(RecordWriter writer, String counterName,
449                                       Reporter reporter) {
450          this.writer = writer;
451          this.counterName = counterName;
452          this.reporter = reporter;
453        }
454    
455        @SuppressWarnings({"unchecked"})
456        public void write(Object key, Object value) throws IOException {
457          reporter.incrCounter(COUNTERS_GROUP, counterName, 1);
458          writer.write(key, value);
459        }
460    
461        public void close(Reporter reporter) throws IOException {
462          writer.close(reporter);
463        }
464      }
465    
466      /**
467       * Gets the output collector for a named output.
468       * <p/>
469       *
470       * @param namedOutput the named output name
471       * @param reporter    the reporter
472       * @return the output collector for the given named output
473       * @throws IOException thrown if output collector could not be created
474       */
475      @SuppressWarnings({"unchecked"})
476      public OutputCollector getCollector(String namedOutput, Reporter reporter)
477        throws IOException {
478        return getCollector(namedOutput, null, reporter);
479      }
480    
481      /**
482       * Gets the output collector for a multi named output.
483       * <p/>
484       *
485       * @param namedOutput the named output name
486       * @param multiName   the multi name part
487       * @param reporter    the reporter
488       * @return the output collector for the given named output
489       * @throws IOException thrown if output collector could not be created
490       */
491      @SuppressWarnings({"unchecked"})
492      public OutputCollector getCollector(String namedOutput, String multiName,
493                                          Reporter reporter)
494        throws IOException {
495    
496        checkNamedOutputName(namedOutput);
497        if (!namedOutputs.contains(namedOutput)) {
498          throw new IllegalArgumentException("Undefined named output '" +
499            namedOutput + "'");
500        }
501        boolean multi = isMultiNamedOutput(conf, namedOutput);
502    
503        if (!multi && multiName != null) {
504          throw new IllegalArgumentException("Name output '" + namedOutput +
505            "' has not been defined as multi");
506        }
507        if (multi) {
508          checkTokenName(multiName);
509        }
510    
511        String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput;
512    
513        final RecordWriter writer =
514          getRecordWriter(namedOutput, baseFileName, reporter);
515    
516        return new OutputCollector() {
517    
518          @SuppressWarnings({"unchecked"})
519          public void collect(Object key, Object value) throws IOException {
520            writer.write(key, value);
521          }
522    
523        };
524      }
525    
526      /**
527       * Closes all the opened named outputs.
528       * <p/>
529       * If overriden subclasses must invoke <code>super.close()</code> at the
530       * end of their <code>close()</code>
531       *
532       * @throws java.io.IOException thrown if any of the MultipleOutput files
533       *                             could not be closed properly.
534       */
535      public void close() throws IOException {
536        for (RecordWriter writer : recordWriters.values()) {
537          writer.close(null);
538        }
539      }
540    
541      private static class InternalFileOutputFormat extends
542        FileOutputFormat<Object, Object> {
543    
544        public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput";
545    
546        @SuppressWarnings({"unchecked"})
547        public RecordWriter<Object, Object> getRecordWriter(
548          FileSystem fs, JobConf job, String baseFileName, Progressable progress)
549          throws IOException {
550    
551          String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
552          String fileName = getUniqueName(job, baseFileName);
553    
554          // The following trick leverages the instantiation of a record writer via
555          // the job conf thus supporting arbitrary output formats.
556          JobConf outputConf = new JobConf(job);
557          outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
558          outputConf.setOutputKeyClass(getNamedOutputKeyClass(job, nameOutput));
559          outputConf.setOutputValueClass(getNamedOutputValueClass(job, nameOutput));
560          OutputFormat outputFormat = outputConf.getOutputFormat();
561          return outputFormat.getRecordWriter(fs, outputConf, fileName, progress);
562        }
563      }
564    
565    }