001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapreduce.lib.output;
019
020 import org.apache.hadoop.classification.InterfaceAudience;
021 import org.apache.hadoop.classification.InterfaceStability;
022 import org.apache.hadoop.conf.Configuration;
023 import org.apache.hadoop.io.Text;
024 import org.apache.hadoop.mapreduce.*;
025 import org.apache.hadoop.mapreduce.Reducer.Context;
026 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
027 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
028 import org.apache.hadoop.util.ReflectionUtils;
029
030 import java.io.IOException;
031 import java.util.*;
032
033 /**
034 * The MultipleOutputs class simplifies writing output data
035 * to multiple outputs
036 *
037 * <p>
038 * Case one: writing to additional outputs other than the job default output.
039 *
040 * Each additional output, or named output, may be configured with its own
041 * <code>OutputFormat</code>, with its own key class and with its own value
042 * class.
043 * </p>
044 *
045 * <p>
046 * Case two: to write data to different files provided by user
047 * </p>
048 *
049 * <p>
050 * MultipleOutputs supports counters, by default they are disabled. The
051 * counters group is the {@link MultipleOutputs} class name. The names of the
052 * counters are the same as the output name. These count the number records
053 * written to each output name.
054 * </p>
055 *
056 * Usage pattern for job submission:
057 * <pre>
058 *
059 * Job job = new Job();
060 *
061 * FileInputFormat.setInputPath(job, inDir);
062 * FileOutputFormat.setOutputPath(job, outDir);
063 *
064 * job.setMapperClass(MOMap.class);
065 * job.setReducerClass(MOReduce.class);
066 * ...
067 *
068 * // Defines additional single text based output 'text' for the job
069 * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
070 * LongWritable.class, Text.class);
071 *
072 * // Defines additional sequence-file based output 'sequence' for the job
073 * MultipleOutputs.addNamedOutput(job, "seq",
074 * SequenceFileOutputFormat.class,
075 * LongWritable.class, Text.class);
076 * ...
077 *
078 * job.waitForCompletion(true);
079 * ...
080 * </pre>
081 * <p>
082 * Usage in Reducer:
083 * <pre>
084 * <K, V> String generateFileName(K k, V v) {
085 * return k.toString() + "_" + v.toString();
086 * }
087 *
088 * public class MOReduce extends
089 * Reducer<WritableComparable, Writable,WritableComparable, Writable> {
090 * private MultipleOutputs mos;
091 * public void setup(Context context) {
092 * ...
093 * mos = new MultipleOutputs(context);
094 * }
095 *
096 * public void reduce(WritableComparable key, Iterator<Writable> values,
097 * Context context)
098 * throws IOException {
099 * ...
100 * mos.write("text", , key, new Text("Hello"));
101 * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
102 * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
103 * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
104 * ...
105 * }
106 *
107 * public void cleanup(Context) throws IOException {
108 * mos.close();
109 * ...
110 * }
111 *
112 * }
113 * </pre>
114 *
115 * <p>
116 * When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat,
117 * MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat
118 * from the old Hadoop API - ie, output can be written from the Reducer to more than one location.
119 * </p>
120 *
121 * <p>
122 * Use <code>MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)</code> to write key and
123 * value to a path specified by <code>baseOutputPath</code>, with no need to specify a named output:
124 * </p>
125 *
126 * <pre>
127 * private MultipleOutputs<Text, Text> out;
128 *
129 * public void setup(Context context) {
130 * out = new MultipleOutputs<Text, Text>(context);
131 * ...
132 * }
133 *
134 * public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
135 * for (Text t : values) {
136 * out.write(key, t, generateFileName(<<i>parameter list...</i>>));
137 * }
138 * }
139 *
140 * protected void cleanup(Context context) throws IOException, InterruptedException {
141 * out.close();
142 * }
143 * </pre>
144 *
145 * <p>
146 * Use your own code in <code>generateFileName()</code> to create a custom path to your results.
147 * '/' characters in <code>baseOutputPath</code> will be translated into directory levels in your file system.
148 * Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc.
149 * No call to <code>context.write()</code> is necessary. See example <code>generateFileName()</code> code below.
150 * </p>
151 *
152 * <pre>
153 * private String generateFileName(Text k) {
154 * // expect Text k in format "Surname|Forename"
155 * String[] kStr = k.toString().split("\\|");
156 *
157 * String sName = kStr[0];
158 * String fName = kStr[1];
159 *
160 * // example for k = Smith|John
161 * // output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc)
162 * return sName + "/" + fName;
163 * }
164 * </pre>
165 *
166 * <p>
167 * Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000.
168 * To prevent this use <code>LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);</code>
169 * instead of <code>job.setOutputFormatClass(TextOutputFormat.class);</code> in your Hadoop job configuration.
170 * </p>
171 *
172 */
173 @InterfaceAudience.Public
174 @InterfaceStability.Stable
175 public class MultipleOutputs<KEYOUT, VALUEOUT> {
176
177 private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
178
179 private static final String MO_PREFIX =
180 "mapreduce.multipleoutputs.namedOutput.";
181
182 private static final String FORMAT = ".format";
183 private static final String KEY = ".key";
184 private static final String VALUE = ".value";
185 private static final String COUNTERS_ENABLED =
186 "mapreduce.multipleoutputs.counters";
187
188 /**
189 * Counters group used by the counters of MultipleOutputs.
190 */
191 private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
192
193 /**
194 * Cache for the taskContexts
195 */
196 private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
197 /**
198 * Cached TaskAttemptContext which uses the job's configured settings
199 */
200 private TaskAttemptContext jobOutputFormatContext;
201
202 /**
203 * Checks if a named output name is valid token.
204 *
205 * @param namedOutput named output Name
206 * @throws IllegalArgumentException if the output name is not valid.
207 */
208 private static void checkTokenName(String namedOutput) {
209 if (namedOutput == null || namedOutput.length() == 0) {
210 throw new IllegalArgumentException(
211 "Name cannot be NULL or emtpy");
212 }
213 for (char ch : namedOutput.toCharArray()) {
214 if ((ch >= 'A') && (ch <= 'Z')) {
215 continue;
216 }
217 if ((ch >= 'a') && (ch <= 'z')) {
218 continue;
219 }
220 if ((ch >= '0') && (ch <= '9')) {
221 continue;
222 }
223 throw new IllegalArgumentException(
224 "Name cannot be have a '" + ch + "' char");
225 }
226 }
227
228 /**
229 * Checks if output name is valid.
230 *
231 * name cannot be the name used for the default output
232 * @param outputPath base output Name
233 * @throws IllegalArgumentException if the output name is not valid.
234 */
235 private static void checkBaseOutputPath(String outputPath) {
236 if (outputPath.equals(FileOutputFormat.PART)) {
237 throw new IllegalArgumentException("output name cannot be 'part'");
238 }
239 }
240
241 /**
242 * Checks if a named output name is valid.
243 *
244 * @param namedOutput named output Name
245 * @throws IllegalArgumentException if the output name is not valid.
246 */
247 private static void checkNamedOutputName(JobContext job,
248 String namedOutput, boolean alreadyDefined) {
249 checkTokenName(namedOutput);
250 checkBaseOutputPath(namedOutput);
251 List<String> definedChannels = getNamedOutputsList(job);
252 if (alreadyDefined && definedChannels.contains(namedOutput)) {
253 throw new IllegalArgumentException("Named output '" + namedOutput +
254 "' already alreadyDefined");
255 } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
256 throw new IllegalArgumentException("Named output '" + namedOutput +
257 "' not defined");
258 }
259 }
260
261 // Returns list of channel names.
262 private static List<String> getNamedOutputsList(JobContext job) {
263 List<String> names = new ArrayList<String>();
264 StringTokenizer st = new StringTokenizer(
265 job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
266 while (st.hasMoreTokens()) {
267 names.add(st.nextToken());
268 }
269 return names;
270 }
271
272 // Returns the named output OutputFormat.
273 @SuppressWarnings("unchecked")
274 private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
275 JobContext job, String namedOutput) {
276 return (Class<? extends OutputFormat<?, ?>>)
277 job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
278 OutputFormat.class);
279 }
280
281 // Returns the key class for a named output.
282 private static Class<?> getNamedOutputKeyClass(JobContext job,
283 String namedOutput) {
284 return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
285 Object.class);
286 }
287
288 // Returns the value class for a named output.
289 private static Class<?> getNamedOutputValueClass(
290 JobContext job, String namedOutput) {
291 return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
292 null, Object.class);
293 }
294
295 /**
296 * Adds a named output for the job.
297 * <p/>
298 *
299 * @param job job to add the named output
300 * @param namedOutput named output name, it has to be a word, letters
301 * and numbers only, cannot be the word 'part' as
302 * that is reserved for the default output.
303 * @param outputFormatClass OutputFormat class.
304 * @param keyClass key class
305 * @param valueClass value class
306 */
307 @SuppressWarnings("unchecked")
308 public static void addNamedOutput(Job job, String namedOutput,
309 Class<? extends OutputFormat> outputFormatClass,
310 Class<?> keyClass, Class<?> valueClass) {
311 checkNamedOutputName(job, namedOutput, true);
312 Configuration conf = job.getConfiguration();
313 conf.set(MULTIPLE_OUTPUTS,
314 conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
315 conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
316 OutputFormat.class);
317 conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
318 conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
319 }
320
321 /**
322 * Enables or disables counters for the named outputs.
323 *
324 * The counters group is the {@link MultipleOutputs} class name.
325 * The names of the counters are the same as the named outputs. These
326 * counters count the number records written to each output name.
327 * By default these counters are disabled.
328 *
329 * @param job job to enable counters
330 * @param enabled indicates if the counters will be enabled or not.
331 */
332 public static void setCountersEnabled(Job job, boolean enabled) {
333 job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
334 }
335
336 /**
337 * Returns if the counters for the named outputs are enabled or not.
338 * By default these counters are disabled.
339 *
340 * @param job the job
341 * @return TRUE if the counters are enabled, FALSE if they are disabled.
342 */
343 public static boolean getCountersEnabled(JobContext job) {
344 return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
345 }
346
347 /**
348 * Wraps RecordWriter to increment counters.
349 */
350 @SuppressWarnings("unchecked")
351 private static class RecordWriterWithCounter extends RecordWriter {
352 private RecordWriter writer;
353 private String counterName;
354 private TaskInputOutputContext context;
355
356 public RecordWriterWithCounter(RecordWriter writer, String counterName,
357 TaskInputOutputContext context) {
358 this.writer = writer;
359 this.counterName = counterName;
360 this.context = context;
361 }
362
363 @SuppressWarnings({"unchecked"})
364 public void write(Object key, Object value)
365 throws IOException, InterruptedException {
366 context.getCounter(COUNTERS_GROUP, counterName).increment(1);
367 writer.write(key, value);
368 }
369
370 public void close(TaskAttemptContext context)
371 throws IOException, InterruptedException {
372 writer.close(context);
373 }
374 }
375
376 // instance code, to be used from Mapper/Reducer code
377
378 private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
379 private Set<String> namedOutputs;
380 private Map<String, RecordWriter<?, ?>> recordWriters;
381 private boolean countersEnabled;
382
383 /**
384 * Creates and initializes multiple outputs support,
385 * it should be instantiated in the Mapper/Reducer setup method.
386 *
387 * @param context the TaskInputOutputContext object
388 */
389 public MultipleOutputs(
390 TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
391 this.context = context;
392 namedOutputs = Collections.unmodifiableSet(
393 new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
394 recordWriters = new HashMap<String, RecordWriter<?, ?>>();
395 countersEnabled = getCountersEnabled(context);
396 }
397
398 /**
399 * Write key and value to the namedOutput.
400 *
401 * Output path is a unique file generated for the namedOutput.
402 * For example, {namedOutput}-(m|r)-{part-number}
403 *
404 * @param namedOutput the named output name
405 * @param key the key
406 * @param value the value
407 */
408 @SuppressWarnings("unchecked")
409 public <K, V> void write(String namedOutput, K key, V value)
410 throws IOException, InterruptedException {
411 write(namedOutput, key, value, namedOutput);
412 }
413
414 /**
415 * Write key and value to baseOutputPath using the namedOutput.
416 *
417 * @param namedOutput the named output name
418 * @param key the key
419 * @param value the value
420 * @param baseOutputPath base-output path to write the record to.
421 * Note: Framework will generate unique filename for the baseOutputPath
422 */
423 @SuppressWarnings("unchecked")
424 public <K, V> void write(String namedOutput, K key, V value,
425 String baseOutputPath) throws IOException, InterruptedException {
426 checkNamedOutputName(context, namedOutput, false);
427 checkBaseOutputPath(baseOutputPath);
428 if (!namedOutputs.contains(namedOutput)) {
429 throw new IllegalArgumentException("Undefined named output '" +
430 namedOutput + "'");
431 }
432 TaskAttemptContext taskContext = getContext(namedOutput);
433 getRecordWriter(taskContext, baseOutputPath).write(key, value);
434 }
435
436 /**
437 * Write key value to an output file name.
438 *
439 * Gets the record writer from job's output format.
440 * Job's output format should be a FileOutputFormat.
441 *
442 * @param key the key
443 * @param value the value
444 * @param baseOutputPath base-output path to write the record to.
445 * Note: Framework will generate unique filename for the baseOutputPath
446 */
447 @SuppressWarnings("unchecked")
448 public void write(KEYOUT key, VALUEOUT value, String baseOutputPath)
449 throws IOException, InterruptedException {
450 checkBaseOutputPath(baseOutputPath);
451 if (jobOutputFormatContext == null) {
452 jobOutputFormatContext =
453 new TaskAttemptContextImpl(context.getConfiguration(),
454 context.getTaskAttemptID(),
455 new WrappedStatusReporter(context));
456 }
457 getRecordWriter(jobOutputFormatContext, baseOutputPath).write(key, value);
458 }
459
460 // by being synchronized MultipleOutputTask can be use with a
461 // MultithreadedMapper.
462 @SuppressWarnings("unchecked")
463 private synchronized RecordWriter getRecordWriter(
464 TaskAttemptContext taskContext, String baseFileName)
465 throws IOException, InterruptedException {
466
467 // look for record-writer in the cache
468 RecordWriter writer = recordWriters.get(baseFileName);
469
470 // If not in cache, create a new one
471 if (writer == null) {
472 // get the record writer from context output format
473 FileOutputFormat.setOutputName(taskContext, baseFileName);
474 try {
475 writer = ((OutputFormat) ReflectionUtils.newInstance(
476 taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
477 .getRecordWriter(taskContext);
478 } catch (ClassNotFoundException e) {
479 throw new IOException(e);
480 }
481
482 // if counters are enabled, wrap the writer with context
483 // to increment counters
484 if (countersEnabled) {
485 writer = new RecordWriterWithCounter(writer, baseFileName, context);
486 }
487
488 // add the record-writer to the cache
489 recordWriters.put(baseFileName, writer);
490 }
491 return writer;
492 }
493
494 // Create a taskAttemptContext for the named output with
495 // output format and output key/value types put in the context
496 private TaskAttemptContext getContext(String nameOutput) throws IOException {
497
498 TaskAttemptContext taskContext = taskContexts.get(nameOutput);
499
500 if (taskContext != null) {
501 return taskContext;
502 }
503
504 // The following trick leverages the instantiation of a record writer via
505 // the job thus supporting arbitrary output formats.
506 Job job = new Job(context.getConfiguration());
507 job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
508 job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
509 job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
510 taskContext = new TaskAttemptContextImpl(job.getConfiguration(), context
511 .getTaskAttemptID(), new WrappedStatusReporter(context));
512
513 taskContexts.put(nameOutput, taskContext);
514
515 return taskContext;
516 }
517
518 private static class WrappedStatusReporter extends StatusReporter {
519
520 TaskAttemptContext context;
521
522 public WrappedStatusReporter(TaskAttemptContext context) {
523 this.context = context;
524 }
525
526 @Override
527 public Counter getCounter(Enum<?> name) {
528 return context.getCounter(name);
529 }
530
531 @Override
532 public Counter getCounter(String group, String name) {
533 return context.getCounter(group, name);
534 }
535
536 @Override
537 public void progress() {
538 context.progress();
539 }
540
541 @Override
542 public float getProgress() {
543 return context.getProgress();
544 }
545
546 @Override
547 public void setStatus(String status) {
548 context.setStatus(status);
549 }
550 }
551
552 /**
553 * Closes all the opened outputs.
554 *
555 * This should be called from cleanup method of map/reduce task.
556 * If overridden subclasses must invoke <code>super.close()</code> at the
557 * end of their <code>close()</code>
558 *
559 */
560 @SuppressWarnings("unchecked")
561 public void close() throws IOException, InterruptedException {
562 for (RecordWriter writer : recordWriters.values()) {
563 writer.close(context);
564 }
565 }
566 }