001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapred.lib; 019 020import org.apache.hadoop.classification.InterfaceAudience; 021import org.apache.hadoop.classification.InterfaceStability; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.mapred.*; 024import org.apache.hadoop.util.Progressable; 025 026import java.io.IOException; 027import java.util.*; 028 029/** 030 * The MultipleOutputs class simplifies writting to additional outputs other 031 * than the job default output via the <code>OutputCollector</code> passed to 032 * the <code>map()</code> and <code>reduce()</code> methods of the 033 * <code>Mapper</code> and <code>Reducer</code> implementations. 034 * <p> 035 * Each additional output, or named output, may be configured with its own 036 * <code>OutputFormat</code>, with its own key class and with its own value 037 * class. 038 * <p> 039 * A named output can be a single file or a multi file. The later is refered as 040 * a multi named output. 041 * <p> 042 * A multi named output is an unbound set of files all sharing the same 043 * <code>OutputFormat</code>, key class and value class configuration. 044 * <p> 045 * When named outputs are used within a <code>Mapper</code> implementation, 046 * key/values written to a name output are not part of the reduce phase, only 047 * key/values written to the job <code>OutputCollector</code> are part of the 048 * reduce phase. 049 * <p> 050 * MultipleOutputs supports counters, by default the are disabled. The counters 051 * group is the {@link MultipleOutputs} class name. 052 * </p> 053 * The names of the counters are the same as the named outputs. For multi 054 * named outputs the name of the counter is the concatenation of the named 055 * output, and underscore '_' and the multiname. 056 * <p> 057 * Job configuration usage pattern is: 058 * <pre> 059 * 060 * JobConf conf = new JobConf(); 061 * 062 * conf.setInputPath(inDir); 063 * FileOutputFormat.setOutputPath(conf, outDir); 064 * 065 * conf.setMapperClass(MOMap.class); 066 * conf.setReducerClass(MOReduce.class); 067 * ... 068 * 069 * // Defines additional single text based output 'text' for the job 070 * MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class, 071 * LongWritable.class, Text.class); 072 * 073 * // Defines additional multi sequencefile based output 'sequence' for the 074 * // job 075 * MultipleOutputs.addMultiNamedOutput(conf, "seq", 076 * SequenceFileOutputFormat.class, 077 * LongWritable.class, Text.class); 078 * ... 079 * 080 * JobClient jc = new JobClient(); 081 * RunningJob job = jc.submitJob(conf); 082 * 083 * ... 084 * </pre> 085 * <p> 086 * Job configuration usage pattern is: 087 * <pre> 088 * 089 * public class MOReduce implements 090 * Reducer<WritableComparable, Writable> { 091 * private MultipleOutputs mos; 092 * 093 * public void configure(JobConf conf) { 094 * ... 095 * mos = new MultipleOutputs(conf); 096 * } 097 * 098 * public void reduce(WritableComparable key, Iterator<Writable> values, 099 * OutputCollector output, Reporter reporter) 100 * throws IOException { 101 * ... 102 * mos.getCollector("text", reporter).collect(key, new Text("Hello")); 103 * mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye")); 104 * mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau")); 105 * ... 106 * } 107 * 108 * public void close() throws IOException { 109 * mos.close(); 110 * ... 111 * } 112 * 113 * } 114 * </pre> 115 */ 116@InterfaceAudience.Public 117@InterfaceStability.Stable 118public class MultipleOutputs { 119 120 private static final String NAMED_OUTPUTS = "mo.namedOutputs"; 121 122 private static final String MO_PREFIX = "mo.namedOutput."; 123 124 private static final String FORMAT = ".format"; 125 private static final String KEY = ".key"; 126 private static final String VALUE = ".value"; 127 private static final String MULTI = ".multi"; 128 129 private static final String COUNTERS_ENABLED = "mo.counters"; 130 131 /** 132 * Counters group used by the counters of MultipleOutputs. 133 */ 134 private static final String COUNTERS_GROUP = MultipleOutputs.class.getName(); 135 136 /** 137 * Checks if a named output is alreadyDefined or not. 138 * 139 * @param conf job conf 140 * @param namedOutput named output names 141 * @param alreadyDefined whether the existence/non-existence of 142 * the named output is to be checked 143 * @throws IllegalArgumentException if the output name is alreadyDefined or 144 * not depending on the value of the 145 * 'alreadyDefined' parameter 146 */ 147 private static void checkNamedOutput(JobConf conf, String namedOutput, 148 boolean alreadyDefined) { 149 List<String> definedChannels = getNamedOutputsList(conf); 150 if (alreadyDefined && definedChannels.contains(namedOutput)) { 151 throw new IllegalArgumentException("Named output '" + namedOutput + 152 "' already alreadyDefined"); 153 } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) { 154 throw new IllegalArgumentException("Named output '" + namedOutput + 155 "' not defined"); 156 } 157 } 158 159 /** 160 * Checks if a named output name is valid token. 161 * 162 * @param namedOutput named output Name 163 * @throws IllegalArgumentException if the output name is not valid. 164 */ 165 private static void checkTokenName(String namedOutput) { 166 if (namedOutput == null || namedOutput.length() == 0) { 167 throw new IllegalArgumentException( 168 "Name cannot be NULL or emtpy"); 169 } 170 for (char ch : namedOutput.toCharArray()) { 171 if ((ch >= 'A') && (ch <= 'Z')) { 172 continue; 173 } 174 if ((ch >= 'a') && (ch <= 'z')) { 175 continue; 176 } 177 if ((ch >= '0') && (ch <= '9')) { 178 continue; 179 } 180 throw new IllegalArgumentException( 181 "Name cannot be have a '" + ch + "' char"); 182 } 183 } 184 185 /** 186 * Checks if a named output name is valid. 187 * 188 * @param namedOutput named output Name 189 * @throws IllegalArgumentException if the output name is not valid. 190 */ 191 private static void checkNamedOutputName(String namedOutput) { 192 checkTokenName(namedOutput); 193 // name cannot be the name used for the default output 194 if (namedOutput.equals("part")) { 195 throw new IllegalArgumentException( 196 "Named output name cannot be 'part'"); 197 } 198 } 199 200 /** 201 * Returns list of channel names. 202 * 203 * @param conf job conf 204 * @return List of channel Names 205 */ 206 public static List<String> getNamedOutputsList(JobConf conf) { 207 List<String> names = new ArrayList<String>(); 208 StringTokenizer st = new StringTokenizer(conf.get(NAMED_OUTPUTS, ""), " "); 209 while (st.hasMoreTokens()) { 210 names.add(st.nextToken()); 211 } 212 return names; 213 } 214 215 216 /** 217 * Returns if a named output is multiple. 218 * 219 * @param conf job conf 220 * @param namedOutput named output 221 * @return <code>true</code> if the name output is multi, <code>false</code> 222 * if it is single. If the name output is not defined it returns 223 * <code>false</code> 224 */ 225 public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) { 226 checkNamedOutput(conf, namedOutput, false); 227 return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false); 228 } 229 230 /** 231 * Returns the named output OutputFormat. 232 * 233 * @param conf job conf 234 * @param namedOutput named output 235 * @return namedOutput OutputFormat 236 */ 237 public static Class<? extends OutputFormat> getNamedOutputFormatClass( 238 JobConf conf, String namedOutput) { 239 checkNamedOutput(conf, namedOutput, false); 240 return conf.getClass(MO_PREFIX + namedOutput + FORMAT, null, 241 OutputFormat.class); 242 } 243 244 /** 245 * Returns the key class for a named output. 246 * 247 * @param conf job conf 248 * @param namedOutput named output 249 * @return class for the named output key 250 */ 251 public static Class<?> getNamedOutputKeyClass(JobConf conf, 252 String namedOutput) { 253 checkNamedOutput(conf, namedOutput, false); 254 return conf.getClass(MO_PREFIX + namedOutput + KEY, null, 255 Object.class); 256 } 257 258 /** 259 * Returns the value class for a named output. 260 * 261 * @param conf job conf 262 * @param namedOutput named output 263 * @return class of named output value 264 */ 265 public static Class<?> getNamedOutputValueClass(JobConf conf, 266 String namedOutput) { 267 checkNamedOutput(conf, namedOutput, false); 268 return conf.getClass(MO_PREFIX + namedOutput + VALUE, null, 269 Object.class); 270 } 271 272 /** 273 * Adds a named output for the job. 274 * 275 * @param conf job conf to add the named output 276 * @param namedOutput named output name, it has to be a word, letters 277 * and numbers only, cannot be the word 'part' as 278 * that is reserved for the 279 * default output. 280 * @param outputFormatClass OutputFormat class. 281 * @param keyClass key class 282 * @param valueClass value class 283 */ 284 public static void addNamedOutput(JobConf conf, String namedOutput, 285 Class<? extends OutputFormat> outputFormatClass, 286 Class<?> keyClass, Class<?> valueClass) { 287 addNamedOutput(conf, namedOutput, false, outputFormatClass, keyClass, 288 valueClass); 289 } 290 291 /** 292 * Adds a multi named output for the job. 293 * 294 * @param conf job conf to add the named output 295 * @param namedOutput named output name, it has to be a word, letters 296 * and numbers only, cannot be the word 'part' as 297 * that is reserved for the 298 * default output. 299 * @param outputFormatClass OutputFormat class. 300 * @param keyClass key class 301 * @param valueClass value class 302 */ 303 public static void addMultiNamedOutput(JobConf conf, String namedOutput, 304 Class<? extends OutputFormat> outputFormatClass, 305 Class<?> keyClass, Class<?> valueClass) { 306 addNamedOutput(conf, namedOutput, true, outputFormatClass, keyClass, 307 valueClass); 308 } 309 310 /** 311 * Adds a named output for the job. 312 * 313 * @param conf job conf to add the named output 314 * @param namedOutput named output name, it has to be a word, letters 315 * and numbers only, cannot be the word 'part' as 316 * that is reserved for the 317 * default output. 318 * @param multi indicates if the named output is multi 319 * @param outputFormatClass OutputFormat class. 320 * @param keyClass key class 321 * @param valueClass value class 322 */ 323 private static void addNamedOutput(JobConf conf, String namedOutput, 324 boolean multi, 325 Class<? extends OutputFormat> outputFormatClass, 326 Class<?> keyClass, Class<?> valueClass) { 327 checkNamedOutputName(namedOutput); 328 checkNamedOutput(conf, namedOutput, true); 329 conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput); 330 conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, 331 OutputFormat.class); 332 conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class); 333 conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class); 334 conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi); 335 } 336 337 /** 338 * Enables or disables counters for the named outputs. 339 * <p> 340 * By default these counters are disabled. 341 * <p> 342 * MultipleOutputs supports counters, by default the are disabled. 343 * The counters group is the {@link MultipleOutputs} class name. 344 * </p> 345 * The names of the counters are the same as the named outputs. For multi 346 * named outputs the name of the counter is the concatenation of the named 347 * output, and underscore '_' and the multiname. 348 * 349 * @param conf job conf to enableadd the named output. 350 * @param enabled indicates if the counters will be enabled or not. 351 */ 352 public static void setCountersEnabled(JobConf conf, boolean enabled) { 353 conf.setBoolean(COUNTERS_ENABLED, enabled); 354 } 355 356 /** 357 * Returns if the counters for the named outputs are enabled or not. 358 * <p> 359 * By default these counters are disabled. 360 * <p> 361 * MultipleOutputs supports counters, by default the are disabled. 362 * The counters group is the {@link MultipleOutputs} class name. 363 * </p> 364 * The names of the counters are the same as the named outputs. For multi 365 * named outputs the name of the counter is the concatenation of the named 366 * output, and underscore '_' and the multiname. 367 * 368 * 369 * @param conf job conf to enableadd the named output. 370 * @return TRUE if the counters are enabled, FALSE if they are disabled. 371 */ 372 public static boolean getCountersEnabled(JobConf conf) { 373 return conf.getBoolean(COUNTERS_ENABLED, false); 374 } 375 376 // instance code, to be used from Mapper/Reducer code 377 378 private JobConf conf; 379 private OutputFormat outputFormat; 380 private Set<String> namedOutputs; 381 private Map<String, RecordWriter> recordWriters; 382 private boolean countersEnabled; 383 384 /** 385 * Creates and initializes multiple named outputs support, it should be 386 * instantiated in the Mapper/Reducer configure method. 387 * 388 * @param job the job configuration object 389 */ 390 public MultipleOutputs(JobConf job) { 391 this.conf = job; 392 outputFormat = new InternalFileOutputFormat(); 393 namedOutputs = Collections.unmodifiableSet( 394 new HashSet<String>(MultipleOutputs.getNamedOutputsList(job))); 395 recordWriters = new HashMap<String, RecordWriter>(); 396 countersEnabled = getCountersEnabled(job); 397 } 398 399 /** 400 * Returns iterator with the defined name outputs. 401 * 402 * @return iterator with the defined named outputs 403 */ 404 public Iterator<String> getNamedOutputs() { 405 return namedOutputs.iterator(); 406 } 407 408 409 // by being synchronized MultipleOutputTask can be use with a 410 // MultithreaderMapRunner. 411 private synchronized RecordWriter getRecordWriter(String namedOutput, 412 String baseFileName, 413 final Reporter reporter) 414 throws IOException { 415 RecordWriter writer = recordWriters.get(baseFileName); 416 if (writer == null) { 417 if (countersEnabled && reporter == null) { 418 throw new IllegalArgumentException( 419 "Counters are enabled, Reporter cannot be NULL"); 420 } 421 JobConf jobConf = new JobConf(conf); 422 jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput); 423 FileSystem fs = FileSystem.get(conf); 424 writer = 425 outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter); 426 427 if (countersEnabled) { 428 if (reporter == null) { 429 throw new IllegalArgumentException( 430 "Counters are enabled, Reporter cannot be NULL"); 431 } 432 writer = new RecordWriterWithCounter(writer, baseFileName, reporter); 433 } 434 435 recordWriters.put(baseFileName, writer); 436 } 437 return writer; 438 } 439 440 private static class RecordWriterWithCounter implements RecordWriter { 441 private RecordWriter writer; 442 private String counterName; 443 private Reporter reporter; 444 445 public RecordWriterWithCounter(RecordWriter writer, String counterName, 446 Reporter reporter) { 447 this.writer = writer; 448 this.counterName = counterName; 449 this.reporter = reporter; 450 } 451 452 @SuppressWarnings({"unchecked"}) 453 public void write(Object key, Object value) throws IOException { 454 reporter.incrCounter(COUNTERS_GROUP, counterName, 1); 455 writer.write(key, value); 456 } 457 458 public void close(Reporter reporter) throws IOException { 459 writer.close(reporter); 460 } 461 } 462 463 /** 464 * Gets the output collector for a named output. 465 * 466 * @param namedOutput the named output name 467 * @param reporter the reporter 468 * @return the output collector for the given named output 469 * @throws IOException thrown if output collector could not be created 470 */ 471 @SuppressWarnings({"unchecked"}) 472 public OutputCollector getCollector(String namedOutput, Reporter reporter) 473 throws IOException { 474 return getCollector(namedOutput, null, reporter); 475 } 476 477 /** 478 * Gets the output collector for a multi named output. 479 * 480 * @param namedOutput the named output name 481 * @param multiName the multi name part 482 * @param reporter the reporter 483 * @return the output collector for the given named output 484 * @throws IOException thrown if output collector could not be created 485 */ 486 @SuppressWarnings({"unchecked"}) 487 public OutputCollector getCollector(String namedOutput, String multiName, 488 Reporter reporter) 489 throws IOException { 490 491 checkNamedOutputName(namedOutput); 492 if (!namedOutputs.contains(namedOutput)) { 493 throw new IllegalArgumentException("Undefined named output '" + 494 namedOutput + "'"); 495 } 496 boolean multi = isMultiNamedOutput(conf, namedOutput); 497 498 if (!multi && multiName != null) { 499 throw new IllegalArgumentException("Name output '" + namedOutput + 500 "' has not been defined as multi"); 501 } 502 if (multi) { 503 checkTokenName(multiName); 504 } 505 506 String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput; 507 508 final RecordWriter writer = 509 getRecordWriter(namedOutput, baseFileName, reporter); 510 511 return new OutputCollector() { 512 513 @SuppressWarnings({"unchecked"}) 514 public void collect(Object key, Object value) throws IOException { 515 writer.write(key, value); 516 } 517 518 }; 519 } 520 521 /** 522 * Closes all the opened named outputs. 523 * <p> 524 * If overriden subclasses must invoke <code>super.close()</code> at the 525 * end of their <code>close()</code> 526 * 527 * @throws java.io.IOException thrown if any of the MultipleOutput files 528 * could not be closed properly. 529 */ 530 public void close() throws IOException { 531 for (RecordWriter writer : recordWriters.values()) { 532 writer.close(null); 533 } 534 } 535 536 private static class InternalFileOutputFormat extends 537 FileOutputFormat<Object, Object> { 538 539 public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput"; 540 541 @SuppressWarnings({"unchecked"}) 542 public RecordWriter<Object, Object> getRecordWriter( 543 FileSystem fs, JobConf job, String baseFileName, Progressable progress) 544 throws IOException { 545 546 String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null); 547 String fileName = getUniqueName(job, baseFileName); 548 549 // The following trick leverages the instantiation of a record writer via 550 // the job conf thus supporting arbitrary output formats. 551 JobConf outputConf = new JobConf(job); 552 outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput)); 553 outputConf.setOutputKeyClass(getNamedOutputKeyClass(job, nameOutput)); 554 outputConf.setOutputValueClass(getNamedOutputValueClass(job, nameOutput)); 555 OutputFormat outputFormat = outputConf.getOutputFormat(); 556 return outputFormat.getRecordWriter(fs, outputConf, fileName, progress); 557 } 558 } 559 560}