001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapreduce.lib.output;
019    
020    import org.apache.hadoop.classification.InterfaceAudience;
021    import org.apache.hadoop.classification.InterfaceStability;
022    import org.apache.hadoop.conf.Configuration;
023    import org.apache.hadoop.io.Text;
024    import org.apache.hadoop.mapreduce.*;
025    import org.apache.hadoop.mapreduce.Reducer.Context;
026    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
027    import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
028    import org.apache.hadoop.util.ReflectionUtils;
029    
030    import java.io.IOException;
031    import java.util.*;
032    
033    /**
034     * The MultipleOutputs class simplifies writing output data 
035     * to multiple outputs
036     * 
037     * <p> 
038     * Case one: writing to additional outputs other than the job default output.
039     *
040     * Each additional output, or named output, may be configured with its own
041     * <code>OutputFormat</code>, with its own key class and with its own value
042     * class.
043     * </p>
044     * 
045     * <p>
046     * Case two: to write data to different files provided by user
047     * </p>
048     * 
049     * <p>
050     * MultipleOutputs supports counters, by default they are disabled. The 
051     * counters group is the {@link MultipleOutputs} class name. The names of the 
052     * counters are the same as the output name. These count the number records 
053     * written to each output name.
054     * </p>
055     * 
056     * Usage pattern for job submission:
057     * <pre>
058     *
059     * Job job = new Job();
060     *
061     * FileInputFormat.setInputPath(job, inDir);
062     * FileOutputFormat.setOutputPath(job, outDir);
063     *
064     * job.setMapperClass(MOMap.class);
065     * job.setReducerClass(MOReduce.class);
066     * ...
067     *
068     * // Defines additional single text based output 'text' for the job
069     * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
070     * LongWritable.class, Text.class);
071     *
072     * // Defines additional sequence-file based output 'sequence' for the job
073     * MultipleOutputs.addNamedOutput(job, "seq",
074     *   SequenceFileOutputFormat.class,
075     *   LongWritable.class, Text.class);
076     * ...
077     *
078     * job.waitForCompletion(true);
079     * ...
080     * </pre>
081     * <p>
082     * Usage in Reducer:
083     * <pre>
084     * <K, V> String generateFileName(K k, V v) {
085     *   return k.toString() + "_" + v.toString();
086     * }
087     * 
088     * public class MOReduce extends
089     *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
090     * private MultipleOutputs mos;
091     * public void setup(Context context) {
092     * ...
093     * mos = new MultipleOutputs(context);
094     * }
095     *
096     * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
097     * Context context)
098     * throws IOException {
099     * ...
100     * mos.write("text", , key, new Text("Hello"));
101     * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
102     * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
103     * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
104     * ...
105     * }
106     *
107     * public void cleanup(Context) throws IOException {
108     * mos.close();
109     * ...
110     * }
111     *
112     * }
113     * </pre>
114     * 
115     * <p>
116     * When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat,
117     * MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat
118     * from the old Hadoop API - ie, output can be written from the Reducer to more than one location.
119     * </p>
120     * 
121     * <p>
122     * Use <code>MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)</code> to write key and 
123     * value to a path specified by <code>baseOutputPath</code>, with no need to specify a named output:
124     * </p>
125     * 
126     * <pre>
127     * private MultipleOutputs<Text, Text> out;
128     * 
129     * public void setup(Context context) {
130     *   out = new MultipleOutputs<Text, Text>(context);
131     *   ...
132     * }
133     * 
134     * public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
135     * for (Text t : values) {
136     *   out.write(key, t, generateFileName(<<i>parameter list...</i>>));
137     *   }
138     * }
139     * 
140     * protected void cleanup(Context context) throws IOException, InterruptedException {
141     *   out.close();
142     * }
143     * </pre>
144     * 
145     * <p>
146     * Use your own code in <code>generateFileName()</code> to create a custom path to your results. 
147     * '/' characters in <code>baseOutputPath</code> will be translated into directory levels in your file system. 
148     * Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. 
149     * No call to <code>context.write()</code> is necessary. See example <code>generateFileName()</code> code below. 
150     * </p>
151     * 
152     * <pre>
153     * private String generateFileName(Text k) {
154     *   // expect Text k in format "Surname|Forename"
155     *   String[] kStr = k.toString().split("\\|");
156     *   
157     *   String sName = kStr[0];
158     *   String fName = kStr[1];
159     *
160     *   // example for k = Smith|John
161     *   // output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc)
162     *   return sName + "/" + fName;
163     * }
164     * </pre>
165     * 
166     * <p>
167     * Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000.
168     * To prevent this use <code>LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);</code>
169     * instead of <code>job.setOutputFormatClass(TextOutputFormat.class);</code> in your Hadoop job configuration.
170     * </p> 
171     * 
172     */
173    @InterfaceAudience.Public
174    @InterfaceStability.Stable
175    public class MultipleOutputs<KEYOUT, VALUEOUT> {
176    
177      private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
178    
179      private static final String MO_PREFIX = 
180        "mapreduce.multipleoutputs.namedOutput.";
181    
182      private static final String FORMAT = ".format";
183      private static final String KEY = ".key";
184      private static final String VALUE = ".value";
185      private static final String COUNTERS_ENABLED = 
186        "mapreduce.multipleoutputs.counters";
187    
188      /**
189       * Counters group used by the counters of MultipleOutputs.
190       */
191      private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
192    
193      /**
194       * Cache for the taskContexts
195       */
196      private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
197      /**
198       * Cached TaskAttemptContext which uses the job's configured settings
199       */
200      private TaskAttemptContext jobOutputFormatContext;
201    
202      /**
203       * Checks if a named output name is valid token.
204       *
205       * @param namedOutput named output Name
206       * @throws IllegalArgumentException if the output name is not valid.
207       */
208      private static void checkTokenName(String namedOutput) {
209        if (namedOutput == null || namedOutput.length() == 0) {
210          throw new IllegalArgumentException(
211            "Name cannot be NULL or emtpy");
212        }
213        for (char ch : namedOutput.toCharArray()) {
214          if ((ch >= 'A') && (ch <= 'Z')) {
215            continue;
216          }
217          if ((ch >= 'a') && (ch <= 'z')) {
218            continue;
219          }
220          if ((ch >= '0') && (ch <= '9')) {
221            continue;
222          }
223          throw new IllegalArgumentException(
224            "Name cannot be have a '" + ch + "' char");
225        }
226      }
227    
228      /**
229       * Checks if output name is valid.
230       *
231       * name cannot be the name used for the default output
232       * @param outputPath base output Name
233       * @throws IllegalArgumentException if the output name is not valid.
234       */
235      private static void checkBaseOutputPath(String outputPath) {
236        if (outputPath.equals(FileOutputFormat.PART)) {
237          throw new IllegalArgumentException("output name cannot be 'part'");
238        }
239      }
240      
241      /**
242       * Checks if a named output name is valid.
243       *
244       * @param namedOutput named output Name
245       * @throws IllegalArgumentException if the output name is not valid.
246       */
247      private static void checkNamedOutputName(JobContext job,
248          String namedOutput, boolean alreadyDefined) {
249        checkTokenName(namedOutput);
250        checkBaseOutputPath(namedOutput);
251        List<String> definedChannels = getNamedOutputsList(job);
252        if (alreadyDefined && definedChannels.contains(namedOutput)) {
253          throw new IllegalArgumentException("Named output '" + namedOutput +
254            "' already alreadyDefined");
255        } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
256          throw new IllegalArgumentException("Named output '" + namedOutput +
257            "' not defined");
258        }
259      }
260    
261      // Returns list of channel names.
262      private static List<String> getNamedOutputsList(JobContext job) {
263        List<String> names = new ArrayList<String>();
264        StringTokenizer st = new StringTokenizer(
265          job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
266        while (st.hasMoreTokens()) {
267          names.add(st.nextToken());
268        }
269        return names;
270      }
271    
272      // Returns the named output OutputFormat.
273      @SuppressWarnings("unchecked")
274      private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
275        JobContext job, String namedOutput) {
276        return (Class<? extends OutputFormat<?, ?>>)
277          job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
278          OutputFormat.class);
279      }
280    
281      // Returns the key class for a named output.
282      private static Class<?> getNamedOutputKeyClass(JobContext job,
283                                                    String namedOutput) {
284        return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
285          Object.class);
286      }
287    
288      // Returns the value class for a named output.
289      private static Class<?> getNamedOutputValueClass(
290          JobContext job, String namedOutput) {
291        return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
292          null, Object.class);
293      }
294    
295      /**
296       * Adds a named output for the job.
297       * <p/>
298       *
299       * @param job               job to add the named output
300       * @param namedOutput       named output name, it has to be a word, letters
301       *                          and numbers only, cannot be the word 'part' as
302       *                          that is reserved for the default output.
303       * @param outputFormatClass OutputFormat class.
304       * @param keyClass          key class
305       * @param valueClass        value class
306       */
307      @SuppressWarnings("unchecked")
308      public static void addNamedOutput(Job job, String namedOutput,
309          Class<? extends OutputFormat> outputFormatClass,
310          Class<?> keyClass, Class<?> valueClass) {
311        checkNamedOutputName(job, namedOutput, true);
312        Configuration conf = job.getConfiguration();
313        conf.set(MULTIPLE_OUTPUTS,
314          conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
315        conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
316          OutputFormat.class);
317        conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
318        conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
319      }
320    
321      /**
322       * Enables or disables counters for the named outputs.
323       * 
324       * The counters group is the {@link MultipleOutputs} class name.
325       * The names of the counters are the same as the named outputs. These
326       * counters count the number records written to each output name.
327       * By default these counters are disabled.
328       *
329       * @param job    job  to enable counters
330       * @param enabled indicates if the counters will be enabled or not.
331       */
332      public static void setCountersEnabled(Job job, boolean enabled) {
333        job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
334      }
335    
336      /**
337       * Returns if the counters for the named outputs are enabled or not.
338       * By default these counters are disabled.
339       *
340       * @param job    the job 
341       * @return TRUE if the counters are enabled, FALSE if they are disabled.
342       */
343      public static boolean getCountersEnabled(JobContext job) {
344        return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
345      }
346    
347      /**
348       * Wraps RecordWriter to increment counters. 
349       */
350      @SuppressWarnings("unchecked")
351      private static class RecordWriterWithCounter extends RecordWriter {
352        private RecordWriter writer;
353        private String counterName;
354        private TaskInputOutputContext context;
355    
356        public RecordWriterWithCounter(RecordWriter writer, String counterName,
357                                       TaskInputOutputContext context) {
358          this.writer = writer;
359          this.counterName = counterName;
360          this.context = context;
361        }
362    
363        @SuppressWarnings({"unchecked"})
364        public void write(Object key, Object value) 
365            throws IOException, InterruptedException {
366          context.getCounter(COUNTERS_GROUP, counterName).increment(1);
367          writer.write(key, value);
368        }
369    
370        public void close(TaskAttemptContext context) 
371            throws IOException, InterruptedException {
372          writer.close(context);
373        }
374      }
375    
376      // instance code, to be used from Mapper/Reducer code
377    
378      private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
379      private Set<String> namedOutputs;
380      private Map<String, RecordWriter<?, ?>> recordWriters;
381      private boolean countersEnabled;
382      
383      /**
384       * Creates and initializes multiple outputs support,
385       * it should be instantiated in the Mapper/Reducer setup method.
386       *
387       * @param context the TaskInputOutputContext object
388       */
389      public MultipleOutputs(
390          TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
391        this.context = context;
392        namedOutputs = Collections.unmodifiableSet(
393          new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
394        recordWriters = new HashMap<String, RecordWriter<?, ?>>();
395        countersEnabled = getCountersEnabled(context);
396      }
397    
398      /**
399       * Write key and value to the namedOutput.
400       *
401       * Output path is a unique file generated for the namedOutput.
402       * For example, {namedOutput}-(m|r)-{part-number}
403       * 
404       * @param namedOutput the named output name
405       * @param key         the key
406       * @param value       the value
407       */
408      @SuppressWarnings("unchecked")
409      public <K, V> void write(String namedOutput, K key, V value)
410          throws IOException, InterruptedException {
411        write(namedOutput, key, value, namedOutput);
412      }
413    
414      /**
415       * Write key and value to baseOutputPath using the namedOutput.
416       * 
417       * @param namedOutput    the named output name
418       * @param key            the key
419       * @param value          the value
420       * @param baseOutputPath base-output path to write the record to.
421       * Note: Framework will generate unique filename for the baseOutputPath
422       */
423      @SuppressWarnings("unchecked")
424      public <K, V> void write(String namedOutput, K key, V value,
425          String baseOutputPath) throws IOException, InterruptedException {
426        checkNamedOutputName(context, namedOutput, false);
427        checkBaseOutputPath(baseOutputPath);
428        if (!namedOutputs.contains(namedOutput)) {
429          throw new IllegalArgumentException("Undefined named output '" +
430            namedOutput + "'");
431        }
432        TaskAttemptContext taskContext = getContext(namedOutput);
433        getRecordWriter(taskContext, baseOutputPath).write(key, value);
434      }
435    
436      /**
437       * Write key value to an output file name.
438       * 
439       * Gets the record writer from job's output format.  
440       * Job's output format should be a FileOutputFormat.
441       * 
442       * @param key       the key
443       * @param value     the value
444       * @param baseOutputPath base-output path to write the record to.
445       * Note: Framework will generate unique filename for the baseOutputPath
446       */
447      @SuppressWarnings("unchecked")
448      public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
449          throws IOException, InterruptedException {
450        checkBaseOutputPath(baseOutputPath);
451        if (jobOutputFormatContext == null) {
452          jobOutputFormatContext = 
453            new TaskAttemptContextImpl(context.getConfiguration(), 
454                                       context.getTaskAttemptID(),
455                                       new WrappedStatusReporter(context));
456        }
457        getRecordWriter(jobOutputFormatContext, baseOutputPath).write(key, value);
458      }
459    
460      // by being synchronized MultipleOutputTask can be use with a
461      // MultithreadedMapper.
462      @SuppressWarnings("unchecked")
463      private synchronized RecordWriter getRecordWriter(
464          TaskAttemptContext taskContext, String baseFileName) 
465          throws IOException, InterruptedException {
466        
467        // look for record-writer in the cache
468        RecordWriter writer = recordWriters.get(baseFileName);
469        
470        // If not in cache, create a new one
471        if (writer == null) {
472          // get the record writer from context output format
473          FileOutputFormat.setOutputName(taskContext, baseFileName);
474          try {
475            writer = ((OutputFormat) ReflectionUtils.newInstance(
476              taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
477              .getRecordWriter(taskContext);
478          } catch (ClassNotFoundException e) {
479            throw new IOException(e);
480          }
481     
482          // if counters are enabled, wrap the writer with context 
483          // to increment counters 
484          if (countersEnabled) {
485            writer = new RecordWriterWithCounter(writer, baseFileName, context);
486          }
487          
488          // add the record-writer to the cache
489          recordWriters.put(baseFileName, writer);
490        }
491        return writer;
492      }
493    
494       // Create a taskAttemptContext for the named output with 
495       // output format and output key/value types put in the context
496      private TaskAttemptContext getContext(String nameOutput) throws IOException {
497          
498        TaskAttemptContext taskContext = taskContexts.get(nameOutput);
499        
500        if (taskContext != null) {
501            return taskContext;
502        }
503        
504        // The following trick leverages the instantiation of a record writer via
505        // the job thus supporting arbitrary output formats.
506        Job job = new Job(context.getConfiguration());
507        job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
508        job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
509        job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
510        taskContext = new TaskAttemptContextImpl(job.getConfiguration(), context
511            .getTaskAttemptID(), new WrappedStatusReporter(context));
512    
513        taskContexts.put(nameOutput, taskContext);
514    
515        return taskContext;
516      }
517    
518      private static class WrappedStatusReporter extends StatusReporter {
519    
520        TaskAttemptContext context;
521    
522        public WrappedStatusReporter(TaskAttemptContext context) {
523          this.context = context;
524        }
525    
526        @Override
527        public Counter getCounter(Enum<?> name) {
528          return context.getCounter(name);
529        }
530    
531        @Override
532        public Counter getCounter(String group, String name) {
533          return context.getCounter(group, name);
534        }
535    
536        @Override
537        public void progress() {
538          context.progress();
539        }
540    
541        @Override
542        public float getProgress() {
543          return context.getProgress();
544        }
545        
546        @Override
547        public void setStatus(String status) {
548          context.setStatus(status);
549        }
550      }
551    
552      /**
553       * Closes all the opened outputs.
554       * 
555       * This should be called from cleanup method of map/reduce task.
556       * If overridden subclasses must invoke <code>super.close()</code> at the
557       * end of their <code>close()</code>
558       * 
559       */
560      @SuppressWarnings("unchecked")
561      public void close() throws IOException, InterruptedException {
562        for (RecordWriter writer : recordWriters.values()) {
563          writer.close(context);
564        }
565      }
566    }