001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapred.lib;
019
020 import org.apache.hadoop.classification.InterfaceAudience;
021 import org.apache.hadoop.classification.InterfaceStability;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.hadoop.mapred.*;
024 import org.apache.hadoop.util.Progressable;
025
026 import java.io.IOException;
027 import java.util.*;
028
029 /**
030 * The MultipleOutputs class simplifies writting to additional outputs other
031 * than the job default output via the <code>OutputCollector</code> passed to
032 * the <code>map()</code> and <code>reduce()</code> methods of the
033 * <code>Mapper</code> and <code>Reducer</code> implementations.
034 * <p/>
035 * Each additional output, or named output, may be configured with its own
036 * <code>OutputFormat</code>, with its own key class and with its own value
037 * class.
038 * <p/>
039 * A named output can be a single file or a multi file. The later is refered as
040 * a multi named output.
041 * <p/>
042 * A multi named output is an unbound set of files all sharing the same
043 * <code>OutputFormat</code>, key class and value class configuration.
044 * <p/>
045 * When named outputs are used within a <code>Mapper</code> implementation,
046 * key/values written to a name output are not part of the reduce phase, only
047 * key/values written to the job <code>OutputCollector</code> are part of the
048 * reduce phase.
049 * <p/>
050 * MultipleOutputs supports counters, by default the are disabled. The counters
051 * group is the {@link MultipleOutputs} class name.
052 * </p>
053 * The names of the counters are the same as the named outputs. For multi
054 * named outputs the name of the counter is the concatenation of the named
055 * output, and underscore '_' and the multiname.
056 * <p/>
057 * Job configuration usage pattern is:
058 * <pre>
059 *
060 * JobConf conf = new JobConf();
061 *
062 * conf.setInputPath(inDir);
063 * FileOutputFormat.setOutputPath(conf, outDir);
064 *
065 * conf.setMapperClass(MOMap.class);
066 * conf.setReducerClass(MOReduce.class);
067 * ...
068 *
069 * // Defines additional single text based output 'text' for the job
070 * MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
071 * LongWritable.class, Text.class);
072 *
073 * // Defines additional multi sequencefile based output 'sequence' for the
074 * // job
075 * MultipleOutputs.addMultiNamedOutput(conf, "seq",
076 * SequenceFileOutputFormat.class,
077 * LongWritable.class, Text.class);
078 * ...
079 *
080 * JobClient jc = new JobClient();
081 * RunningJob job = jc.submitJob(conf);
082 *
083 * ...
084 * </pre>
085 * <p/>
086 * Job configuration usage pattern is:
087 * <pre>
088 *
089 * public class MOReduce implements
090 * Reducer<WritableComparable, Writable> {
091 * private MultipleOutputs mos;
092 *
093 * public void configure(JobConf conf) {
094 * ...
095 * mos = new MultipleOutputs(conf);
096 * }
097 *
098 * public void reduce(WritableComparable key, Iterator<Writable> values,
099 * OutputCollector output, Reporter reporter)
100 * throws IOException {
101 * ...
102 * mos.getCollector("text", reporter).collect(key, new Text("Hello"));
103 * mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye"));
104 * mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau"));
105 * ...
106 * }
107 *
108 * public void close() throws IOException {
109 * mos.close();
110 * ...
111 * }
112 *
113 * }
114 * </pre>
115 */
116 @InterfaceAudience.Public
117 @InterfaceStability.Stable
118 public class MultipleOutputs {
119
120 private static final String NAMED_OUTPUTS = "mo.namedOutputs";
121
122 private static final String MO_PREFIX = "mo.namedOutput.";
123
124 private static final String FORMAT = ".format";
125 private static final String KEY = ".key";
126 private static final String VALUE = ".value";
127 private static final String MULTI = ".multi";
128
129 private static final String COUNTERS_ENABLED = "mo.counters";
130
131 /**
132 * Counters group used by the counters of MultipleOutputs.
133 */
134 private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
135
136 /**
137 * Checks if a named output is alreadyDefined or not.
138 *
139 * @param conf job conf
140 * @param namedOutput named output names
141 * @param alreadyDefined whether the existence/non-existence of
142 * the named output is to be checked
143 * @throws IllegalArgumentException if the output name is alreadyDefined or
144 * not depending on the value of the
145 * 'alreadyDefined' parameter
146 */
147 private static void checkNamedOutput(JobConf conf, String namedOutput,
148 boolean alreadyDefined) {
149 List<String> definedChannels = getNamedOutputsList(conf);
150 if (alreadyDefined && definedChannels.contains(namedOutput)) {
151 throw new IllegalArgumentException("Named output '" + namedOutput +
152 "' already alreadyDefined");
153 } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
154 throw new IllegalArgumentException("Named output '" + namedOutput +
155 "' not defined");
156 }
157 }
158
159 /**
160 * Checks if a named output name is valid token.
161 *
162 * @param namedOutput named output Name
163 * @throws IllegalArgumentException if the output name is not valid.
164 */
165 private static void checkTokenName(String namedOutput) {
166 if (namedOutput == null || namedOutput.length() == 0) {
167 throw new IllegalArgumentException(
168 "Name cannot be NULL or emtpy");
169 }
170 for (char ch : namedOutput.toCharArray()) {
171 if ((ch >= 'A') && (ch <= 'Z')) {
172 continue;
173 }
174 if ((ch >= 'a') && (ch <= 'z')) {
175 continue;
176 }
177 if ((ch >= '0') && (ch <= '9')) {
178 continue;
179 }
180 throw new IllegalArgumentException(
181 "Name cannot be have a '" + ch + "' char");
182 }
183 }
184
185 /**
186 * Checks if a named output name is valid.
187 *
188 * @param namedOutput named output Name
189 * @throws IllegalArgumentException if the output name is not valid.
190 */
191 private static void checkNamedOutputName(String namedOutput) {
192 checkTokenName(namedOutput);
193 // name cannot be the name used for the default output
194 if (namedOutput.equals("part")) {
195 throw new IllegalArgumentException(
196 "Named output name cannot be 'part'");
197 }
198 }
199
200 /**
201 * Returns list of channel names.
202 *
203 * @param conf job conf
204 * @return List of channel Names
205 */
206 public static List<String> getNamedOutputsList(JobConf conf) {
207 List<String> names = new ArrayList<String>();
208 StringTokenizer st = new StringTokenizer(conf.get(NAMED_OUTPUTS, ""), " ");
209 while (st.hasMoreTokens()) {
210 names.add(st.nextToken());
211 }
212 return names;
213 }
214
215
216 /**
217 * Returns if a named output is multiple.
218 *
219 * @param conf job conf
220 * @param namedOutput named output
221 * @return <code>true</code> if the name output is multi, <code>false</code>
222 * if it is single. If the name output is not defined it returns
223 * <code>false</code>
224 */
225 public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) {
226 checkNamedOutput(conf, namedOutput, false);
227 return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false);
228 }
229
230 /**
231 * Returns the named output OutputFormat.
232 *
233 * @param conf job conf
234 * @param namedOutput named output
235 * @return namedOutput OutputFormat
236 */
237 public static Class<? extends OutputFormat> getNamedOutputFormatClass(
238 JobConf conf, String namedOutput) {
239 checkNamedOutput(conf, namedOutput, false);
240 return conf.getClass(MO_PREFIX + namedOutput + FORMAT, null,
241 OutputFormat.class);
242 }
243
244 /**
245 * Returns the key class for a named output.
246 *
247 * @param conf job conf
248 * @param namedOutput named output
249 * @return class for the named output key
250 */
251 public static Class<?> getNamedOutputKeyClass(JobConf conf,
252 String namedOutput) {
253 checkNamedOutput(conf, namedOutput, false);
254 return conf.getClass(MO_PREFIX + namedOutput + KEY, null,
255 Object.class);
256 }
257
258 /**
259 * Returns the value class for a named output.
260 *
261 * @param conf job conf
262 * @param namedOutput named output
263 * @return class of named output value
264 */
265 public static Class<?> getNamedOutputValueClass(JobConf conf,
266 String namedOutput) {
267 checkNamedOutput(conf, namedOutput, false);
268 return conf.getClass(MO_PREFIX + namedOutput + VALUE, null,
269 Object.class);
270 }
271
272 /**
273 * Adds a named output for the job.
274 * <p/>
275 *
276 * @param conf job conf to add the named output
277 * @param namedOutput named output name, it has to be a word, letters
278 * and numbers only, cannot be the word 'part' as
279 * that is reserved for the
280 * default output.
281 * @param outputFormatClass OutputFormat class.
282 * @param keyClass key class
283 * @param valueClass value class
284 */
285 public static void addNamedOutput(JobConf conf, String namedOutput,
286 Class<? extends OutputFormat> outputFormatClass,
287 Class<?> keyClass, Class<?> valueClass) {
288 addNamedOutput(conf, namedOutput, false, outputFormatClass, keyClass,
289 valueClass);
290 }
291
292 /**
293 * Adds a multi named output for the job.
294 * <p/>
295 *
296 * @param conf job conf to add the named output
297 * @param namedOutput named output name, it has to be a word, letters
298 * and numbers only, cannot be the word 'part' as
299 * that is reserved for the
300 * default output.
301 * @param outputFormatClass OutputFormat class.
302 * @param keyClass key class
303 * @param valueClass value class
304 */
305 public static void addMultiNamedOutput(JobConf conf, String namedOutput,
306 Class<? extends OutputFormat> outputFormatClass,
307 Class<?> keyClass, Class<?> valueClass) {
308 addNamedOutput(conf, namedOutput, true, outputFormatClass, keyClass,
309 valueClass);
310 }
311
312 /**
313 * Adds a named output for the job.
314 * <p/>
315 *
316 * @param conf job conf to add the named output
317 * @param namedOutput named output name, it has to be a word, letters
318 * and numbers only, cannot be the word 'part' as
319 * that is reserved for the
320 * default output.
321 * @param multi indicates if the named output is multi
322 * @param outputFormatClass OutputFormat class.
323 * @param keyClass key class
324 * @param valueClass value class
325 */
326 private static void addNamedOutput(JobConf conf, String namedOutput,
327 boolean multi,
328 Class<? extends OutputFormat> outputFormatClass,
329 Class<?> keyClass, Class<?> valueClass) {
330 checkNamedOutputName(namedOutput);
331 checkNamedOutput(conf, namedOutput, true);
332 conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput);
333 conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
334 OutputFormat.class);
335 conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
336 conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
337 conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi);
338 }
339
340 /**
341 * Enables or disables counters for the named outputs.
342 * <p/>
343 * By default these counters are disabled.
344 * <p/>
345 * MultipleOutputs supports counters, by default the are disabled.
346 * The counters group is the {@link MultipleOutputs} class name.
347 * </p>
348 * The names of the counters are the same as the named outputs. For multi
349 * named outputs the name of the counter is the concatenation of the named
350 * output, and underscore '_' and the multiname.
351 *
352 * @param conf job conf to enableadd the named output.
353 * @param enabled indicates if the counters will be enabled or not.
354 */
355 public static void setCountersEnabled(JobConf conf, boolean enabled) {
356 conf.setBoolean(COUNTERS_ENABLED, enabled);
357 }
358
359 /**
360 * Returns if the counters for the named outputs are enabled or not.
361 * <p/>
362 * By default these counters are disabled.
363 * <p/>
364 * MultipleOutputs supports counters, by default the are disabled.
365 * The counters group is the {@link MultipleOutputs} class name.
366 * </p>
367 * The names of the counters are the same as the named outputs. For multi
368 * named outputs the name of the counter is the concatenation of the named
369 * output, and underscore '_' and the multiname.
370 *
371 *
372 * @param conf job conf to enableadd the named output.
373 * @return TRUE if the counters are enabled, FALSE if they are disabled.
374 */
375 public static boolean getCountersEnabled(JobConf conf) {
376 return conf.getBoolean(COUNTERS_ENABLED, false);
377 }
378
379 // instance code, to be used from Mapper/Reducer code
380
381 private JobConf conf;
382 private OutputFormat outputFormat;
383 private Set<String> namedOutputs;
384 private Map<String, RecordWriter> recordWriters;
385 private boolean countersEnabled;
386
387 /**
388 * Creates and initializes multiple named outputs support, it should be
389 * instantiated in the Mapper/Reducer configure method.
390 *
391 * @param job the job configuration object
392 */
393 public MultipleOutputs(JobConf job) {
394 this.conf = job;
395 outputFormat = new InternalFileOutputFormat();
396 namedOutputs = Collections.unmodifiableSet(
397 new HashSet<String>(MultipleOutputs.getNamedOutputsList(job)));
398 recordWriters = new HashMap<String, RecordWriter>();
399 countersEnabled = getCountersEnabled(job);
400 }
401
402 /**
403 * Returns iterator with the defined name outputs.
404 *
405 * @return iterator with the defined named outputs
406 */
407 public Iterator<String> getNamedOutputs() {
408 return namedOutputs.iterator();
409 }
410
411
412 // by being synchronized MultipleOutputTask can be use with a
413 // MultithreaderMapRunner.
414 private synchronized RecordWriter getRecordWriter(String namedOutput,
415 String baseFileName,
416 final Reporter reporter)
417 throws IOException {
418 RecordWriter writer = recordWriters.get(baseFileName);
419 if (writer == null) {
420 if (countersEnabled && reporter == null) {
421 throw new IllegalArgumentException(
422 "Counters are enabled, Reporter cannot be NULL");
423 }
424 JobConf jobConf = new JobConf(conf);
425 jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput);
426 FileSystem fs = FileSystem.get(conf);
427 writer =
428 outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter);
429
430 if (countersEnabled) {
431 if (reporter == null) {
432 throw new IllegalArgumentException(
433 "Counters are enabled, Reporter cannot be NULL");
434 }
435 writer = new RecordWriterWithCounter(writer, baseFileName, reporter);
436 }
437
438 recordWriters.put(baseFileName, writer);
439 }
440 return writer;
441 }
442
443 private static class RecordWriterWithCounter implements RecordWriter {
444 private RecordWriter writer;
445 private String counterName;
446 private Reporter reporter;
447
448 public RecordWriterWithCounter(RecordWriter writer, String counterName,
449 Reporter reporter) {
450 this.writer = writer;
451 this.counterName = counterName;
452 this.reporter = reporter;
453 }
454
455 @SuppressWarnings({"unchecked"})
456 public void write(Object key, Object value) throws IOException {
457 reporter.incrCounter(COUNTERS_GROUP, counterName, 1);
458 writer.write(key, value);
459 }
460
461 public void close(Reporter reporter) throws IOException {
462 writer.close(reporter);
463 }
464 }
465
466 /**
467 * Gets the output collector for a named output.
468 * <p/>
469 *
470 * @param namedOutput the named output name
471 * @param reporter the reporter
472 * @return the output collector for the given named output
473 * @throws IOException thrown if output collector could not be created
474 */
475 @SuppressWarnings({"unchecked"})
476 public OutputCollector getCollector(String namedOutput, Reporter reporter)
477 throws IOException {
478 return getCollector(namedOutput, null, reporter);
479 }
480
481 /**
482 * Gets the output collector for a multi named output.
483 * <p/>
484 *
485 * @param namedOutput the named output name
486 * @param multiName the multi name part
487 * @param reporter the reporter
488 * @return the output collector for the given named output
489 * @throws IOException thrown if output collector could not be created
490 */
491 @SuppressWarnings({"unchecked"})
492 public OutputCollector getCollector(String namedOutput, String multiName,
493 Reporter reporter)
494 throws IOException {
495
496 checkNamedOutputName(namedOutput);
497 if (!namedOutputs.contains(namedOutput)) {
498 throw new IllegalArgumentException("Undefined named output '" +
499 namedOutput + "'");
500 }
501 boolean multi = isMultiNamedOutput(conf, namedOutput);
502
503 if (!multi && multiName != null) {
504 throw new IllegalArgumentException("Name output '" + namedOutput +
505 "' has not been defined as multi");
506 }
507 if (multi) {
508 checkTokenName(multiName);
509 }
510
511 String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput;
512
513 final RecordWriter writer =
514 getRecordWriter(namedOutput, baseFileName, reporter);
515
516 return new OutputCollector() {
517
518 @SuppressWarnings({"unchecked"})
519 public void collect(Object key, Object value) throws IOException {
520 writer.write(key, value);
521 }
522
523 };
524 }
525
526 /**
527 * Closes all the opened named outputs.
528 * <p/>
529 * If overriden subclasses must invoke <code>super.close()</code> at the
530 * end of their <code>close()</code>
531 *
532 * @throws java.io.IOException thrown if any of the MultipleOutput files
533 * could not be closed properly.
534 */
535 public void close() throws IOException {
536 for (RecordWriter writer : recordWriters.values()) {
537 writer.close(null);
538 }
539 }
540
541 private static class InternalFileOutputFormat extends
542 FileOutputFormat<Object, Object> {
543
544 public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput";
545
546 @SuppressWarnings({"unchecked"})
547 public RecordWriter<Object, Object> getRecordWriter(
548 FileSystem fs, JobConf job, String baseFileName, Progressable progress)
549 throws IOException {
550
551 String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
552 String fileName = getUniqueName(job, baseFileName);
553
554 // The following trick leverages the instantiation of a record writer via
555 // the job conf thus supporting arbitrary output formats.
556 JobConf outputConf = new JobConf(job);
557 outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
558 outputConf.setOutputKeyClass(getNamedOutputKeyClass(job, nameOutput));
559 outputConf.setOutputValueClass(getNamedOutputValueClass(job, nameOutput));
560 OutputFormat outputFormat = outputConf.getOutputFormat();
561 return outputFormat.getRecordWriter(fs, outputConf, fileName, progress);
562 }
563 }
564
565 }