001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapred.lib; 019 020import org.apache.hadoop.classification.InterfaceAudience; 021import org.apache.hadoop.classification.InterfaceStability; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.mapred.*; 024import org.apache.hadoop.util.Progressable; 025 026import java.io.IOException; 027import java.util.*; 028 029/** 030 * The MultipleOutputs class simplifies writting to additional outputs other 031 * than the job default output via the <code>OutputCollector</code> passed to 032 * the <code>map()</code> and <code>reduce()</code> methods of the 033 * <code>Mapper</code> and <code>Reducer</code> implementations. 034 * <p/> 035 * Each additional output, or named output, may be configured with its own 036 * <code>OutputFormat</code>, with its own key class and with its own value 037 * class. 038 * <p/> 039 * A named output can be a single file or a multi file. The later is refered as 040 * a multi named output. 041 * <p/> 042 * A multi named output is an unbound set of files all sharing the same 043 * <code>OutputFormat</code>, key class and value class configuration. 044 * <p/> 045 * When named outputs are used within a <code>Mapper</code> implementation, 046 * key/values written to a name output are not part of the reduce phase, only 047 * key/values written to the job <code>OutputCollector</code> are part of the 048 * reduce phase. 049 * <p/> 050 * MultipleOutputs supports counters, by default the are disabled. The counters 051 * group is the {@link MultipleOutputs} class name. 052 * </p> 053 * The names of the counters are the same as the named outputs. For multi 054 * named outputs the name of the counter is the concatenation of the named 055 * output, and underscore '_' and the multiname. 056 * <p/> 057 * Job configuration usage pattern is: 058 * <pre> 059 * 060 * JobConf conf = new JobConf(); 061 * 062 * conf.setInputPath(inDir); 063 * FileOutputFormat.setOutputPath(conf, outDir); 064 * 065 * conf.setMapperClass(MOMap.class); 066 * conf.setReducerClass(MOReduce.class); 067 * ... 068 * 069 * // Defines additional single text based output 'text' for the job 070 * MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class, 071 * LongWritable.class, Text.class); 072 * 073 * // Defines additional multi sequencefile based output 'sequence' for the 074 * // job 075 * MultipleOutputs.addMultiNamedOutput(conf, "seq", 076 * SequenceFileOutputFormat.class, 077 * LongWritable.class, Text.class); 078 * ... 079 * 080 * JobClient jc = new JobClient(); 081 * RunningJob job = jc.submitJob(conf); 082 * 083 * ... 084 * </pre> 085 * <p/> 086 * Job configuration usage pattern is: 087 * <pre> 088 * 089 * public class MOReduce implements 090 * Reducer<WritableComparable, Writable> { 091 * private MultipleOutputs mos; 092 * 093 * public void configure(JobConf conf) { 094 * ... 095 * mos = new MultipleOutputs(conf); 096 * } 097 * 098 * public void reduce(WritableComparable key, Iterator<Writable> values, 099 * OutputCollector output, Reporter reporter) 100 * throws IOException { 101 * ... 102 * mos.getCollector("text", reporter).collect(key, new Text("Hello")); 103 * mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye")); 104 * mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau")); 105 * ... 106 * } 107 * 108 * public void close() throws IOException { 109 * mos.close(); 110 * ... 111 * } 112 * 113 * } 114 * </pre> 115 */ 116@InterfaceAudience.Public 117@InterfaceStability.Stable 118public class MultipleOutputs { 119 120 private static final String NAMED_OUTPUTS = "mo.namedOutputs"; 121 122 private static final String MO_PREFIX = "mo.namedOutput."; 123 124 private static final String FORMAT = ".format"; 125 private static final String KEY = ".key"; 126 private static final String VALUE = ".value"; 127 private static final String MULTI = ".multi"; 128 129 private static final String COUNTERS_ENABLED = "mo.counters"; 130 131 /** 132 * Counters group used by the counters of MultipleOutputs. 133 */ 134 private static final String COUNTERS_GROUP = MultipleOutputs.class.getName(); 135 136 /** 137 * Checks if a named output is alreadyDefined or not. 138 * 139 * @param conf job conf 140 * @param namedOutput named output names 141 * @param alreadyDefined whether the existence/non-existence of 142 * the named output is to be checked 143 * @throws IllegalArgumentException if the output name is alreadyDefined or 144 * not depending on the value of the 145 * 'alreadyDefined' parameter 146 */ 147 private static void checkNamedOutput(JobConf conf, String namedOutput, 148 boolean alreadyDefined) { 149 List<String> definedChannels = getNamedOutputsList(conf); 150 if (alreadyDefined && definedChannels.contains(namedOutput)) { 151 throw new IllegalArgumentException("Named output '" + namedOutput + 152 "' already alreadyDefined"); 153 } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) { 154 throw new IllegalArgumentException("Named output '" + namedOutput + 155 "' not defined"); 156 } 157 } 158 159 /** 160 * Checks if a named output name is valid token. 161 * 162 * @param namedOutput named output Name 163 * @throws IllegalArgumentException if the output name is not valid. 164 */ 165 private static void checkTokenName(String namedOutput) { 166 if (namedOutput == null || namedOutput.length() == 0) { 167 throw new IllegalArgumentException( 168 "Name cannot be NULL or emtpy"); 169 } 170 for (char ch : namedOutput.toCharArray()) { 171 if ((ch >= 'A') && (ch <= 'Z')) { 172 continue; 173 } 174 if ((ch >= 'a') && (ch <= 'z')) { 175 continue; 176 } 177 if ((ch >= '0') && (ch <= '9')) { 178 continue; 179 } 180 throw new IllegalArgumentException( 181 "Name cannot be have a '" + ch + "' char"); 182 } 183 } 184 185 /** 186 * Checks if a named output name is valid. 187 * 188 * @param namedOutput named output Name 189 * @throws IllegalArgumentException if the output name is not valid. 190 */ 191 private static void checkNamedOutputName(String namedOutput) { 192 checkTokenName(namedOutput); 193 // name cannot be the name used for the default output 194 if (namedOutput.equals("part")) { 195 throw new IllegalArgumentException( 196 "Named output name cannot be 'part'"); 197 } 198 } 199 200 /** 201 * Returns list of channel names. 202 * 203 * @param conf job conf 204 * @return List of channel Names 205 */ 206 public static List<String> getNamedOutputsList(JobConf conf) { 207 List<String> names = new ArrayList<String>(); 208 StringTokenizer st = new StringTokenizer(conf.get(NAMED_OUTPUTS, ""), " "); 209 while (st.hasMoreTokens()) { 210 names.add(st.nextToken()); 211 } 212 return names; 213 } 214 215 216 /** 217 * Returns if a named output is multiple. 218 * 219 * @param conf job conf 220 * @param namedOutput named output 221 * @return <code>true</code> if the name output is multi, <code>false</code> 222 * if it is single. If the name output is not defined it returns 223 * <code>false</code> 224 */ 225 public static boolean isMultiNamedOutput(JobConf conf, String namedOutput) { 226 checkNamedOutput(conf, namedOutput, false); 227 return conf.getBoolean(MO_PREFIX + namedOutput + MULTI, false); 228 } 229 230 /** 231 * Returns the named output OutputFormat. 232 * 233 * @param conf job conf 234 * @param namedOutput named output 235 * @return namedOutput OutputFormat 236 */ 237 public static Class<? extends OutputFormat> getNamedOutputFormatClass( 238 JobConf conf, String namedOutput) { 239 checkNamedOutput(conf, namedOutput, false); 240 return conf.getClass(MO_PREFIX + namedOutput + FORMAT, null, 241 OutputFormat.class); 242 } 243 244 /** 245 * Returns the key class for a named output. 246 * 247 * @param conf job conf 248 * @param namedOutput named output 249 * @return class for the named output key 250 */ 251 public static Class<?> getNamedOutputKeyClass(JobConf conf, 252 String namedOutput) { 253 checkNamedOutput(conf, namedOutput, false); 254 return conf.getClass(MO_PREFIX + namedOutput + KEY, null, 255 Object.class); 256 } 257 258 /** 259 * Returns the value class for a named output. 260 * 261 * @param conf job conf 262 * @param namedOutput named output 263 * @return class of named output value 264 */ 265 public static Class<?> getNamedOutputValueClass(JobConf conf, 266 String namedOutput) { 267 checkNamedOutput(conf, namedOutput, false); 268 return conf.getClass(MO_PREFIX + namedOutput + VALUE, null, 269 Object.class); 270 } 271 272 /** 273 * Adds a named output for the job. 274 * <p/> 275 * 276 * @param conf job conf to add the named output 277 * @param namedOutput named output name, it has to be a word, letters 278 * and numbers only, cannot be the word 'part' as 279 * that is reserved for the 280 * default output. 281 * @param outputFormatClass OutputFormat class. 282 * @param keyClass key class 283 * @param valueClass value class 284 */ 285 public static void addNamedOutput(JobConf conf, String namedOutput, 286 Class<? extends OutputFormat> outputFormatClass, 287 Class<?> keyClass, Class<?> valueClass) { 288 addNamedOutput(conf, namedOutput, false, outputFormatClass, keyClass, 289 valueClass); 290 } 291 292 /** 293 * Adds a multi named output for the job. 294 * <p/> 295 * 296 * @param conf job conf to add the named output 297 * @param namedOutput named output name, it has to be a word, letters 298 * and numbers only, cannot be the word 'part' as 299 * that is reserved for the 300 * default output. 301 * @param outputFormatClass OutputFormat class. 302 * @param keyClass key class 303 * @param valueClass value class 304 */ 305 public static void addMultiNamedOutput(JobConf conf, String namedOutput, 306 Class<? extends OutputFormat> outputFormatClass, 307 Class<?> keyClass, Class<?> valueClass) { 308 addNamedOutput(conf, namedOutput, true, outputFormatClass, keyClass, 309 valueClass); 310 } 311 312 /** 313 * Adds a named output for the job. 314 * <p/> 315 * 316 * @param conf job conf to add the named output 317 * @param namedOutput named output name, it has to be a word, letters 318 * and numbers only, cannot be the word 'part' as 319 * that is reserved for the 320 * default output. 321 * @param multi indicates if the named output is multi 322 * @param outputFormatClass OutputFormat class. 323 * @param keyClass key class 324 * @param valueClass value class 325 */ 326 private static void addNamedOutput(JobConf conf, String namedOutput, 327 boolean multi, 328 Class<? extends OutputFormat> outputFormatClass, 329 Class<?> keyClass, Class<?> valueClass) { 330 checkNamedOutputName(namedOutput); 331 checkNamedOutput(conf, namedOutput, true); 332 conf.set(NAMED_OUTPUTS, conf.get(NAMED_OUTPUTS, "") + " " + namedOutput); 333 conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, 334 OutputFormat.class); 335 conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class); 336 conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class); 337 conf.setBoolean(MO_PREFIX + namedOutput + MULTI, multi); 338 } 339 340 /** 341 * Enables or disables counters for the named outputs. 342 * <p/> 343 * By default these counters are disabled. 344 * <p/> 345 * MultipleOutputs supports counters, by default the are disabled. 346 * The counters group is the {@link MultipleOutputs} class name. 347 * </p> 348 * The names of the counters are the same as the named outputs. For multi 349 * named outputs the name of the counter is the concatenation of the named 350 * output, and underscore '_' and the multiname. 351 * 352 * @param conf job conf to enableadd the named output. 353 * @param enabled indicates if the counters will be enabled or not. 354 */ 355 public static void setCountersEnabled(JobConf conf, boolean enabled) { 356 conf.setBoolean(COUNTERS_ENABLED, enabled); 357 } 358 359 /** 360 * Returns if the counters for the named outputs are enabled or not. 361 * <p/> 362 * By default these counters are disabled. 363 * <p/> 364 * MultipleOutputs supports counters, by default the are disabled. 365 * The counters group is the {@link MultipleOutputs} class name. 366 * </p> 367 * The names of the counters are the same as the named outputs. For multi 368 * named outputs the name of the counter is the concatenation of the named 369 * output, and underscore '_' and the multiname. 370 * 371 * 372 * @param conf job conf to enableadd the named output. 373 * @return TRUE if the counters are enabled, FALSE if they are disabled. 374 */ 375 public static boolean getCountersEnabled(JobConf conf) { 376 return conf.getBoolean(COUNTERS_ENABLED, false); 377 } 378 379 // instance code, to be used from Mapper/Reducer code 380 381 private JobConf conf; 382 private OutputFormat outputFormat; 383 private Set<String> namedOutputs; 384 private Map<String, RecordWriter> recordWriters; 385 private boolean countersEnabled; 386 387 /** 388 * Creates and initializes multiple named outputs support, it should be 389 * instantiated in the Mapper/Reducer configure method. 390 * 391 * @param job the job configuration object 392 */ 393 public MultipleOutputs(JobConf job) { 394 this.conf = job; 395 outputFormat = new InternalFileOutputFormat(); 396 namedOutputs = Collections.unmodifiableSet( 397 new HashSet<String>(MultipleOutputs.getNamedOutputsList(job))); 398 recordWriters = new HashMap<String, RecordWriter>(); 399 countersEnabled = getCountersEnabled(job); 400 } 401 402 /** 403 * Returns iterator with the defined name outputs. 404 * 405 * @return iterator with the defined named outputs 406 */ 407 public Iterator<String> getNamedOutputs() { 408 return namedOutputs.iterator(); 409 } 410 411 412 // by being synchronized MultipleOutputTask can be use with a 413 // MultithreaderMapRunner. 414 private synchronized RecordWriter getRecordWriter(String namedOutput, 415 String baseFileName, 416 final Reporter reporter) 417 throws IOException { 418 RecordWriter writer = recordWriters.get(baseFileName); 419 if (writer == null) { 420 if (countersEnabled && reporter == null) { 421 throw new IllegalArgumentException( 422 "Counters are enabled, Reporter cannot be NULL"); 423 } 424 JobConf jobConf = new JobConf(conf); 425 jobConf.set(InternalFileOutputFormat.CONFIG_NAMED_OUTPUT, namedOutput); 426 FileSystem fs = FileSystem.get(conf); 427 writer = 428 outputFormat.getRecordWriter(fs, jobConf, baseFileName, reporter); 429 430 if (countersEnabled) { 431 if (reporter == null) { 432 throw new IllegalArgumentException( 433 "Counters are enabled, Reporter cannot be NULL"); 434 } 435 writer = new RecordWriterWithCounter(writer, baseFileName, reporter); 436 } 437 438 recordWriters.put(baseFileName, writer); 439 } 440 return writer; 441 } 442 443 private static class RecordWriterWithCounter implements RecordWriter { 444 private RecordWriter writer; 445 private String counterName; 446 private Reporter reporter; 447 448 public RecordWriterWithCounter(RecordWriter writer, String counterName, 449 Reporter reporter) { 450 this.writer = writer; 451 this.counterName = counterName; 452 this.reporter = reporter; 453 } 454 455 @SuppressWarnings({"unchecked"}) 456 public void write(Object key, Object value) throws IOException { 457 reporter.incrCounter(COUNTERS_GROUP, counterName, 1); 458 writer.write(key, value); 459 } 460 461 public void close(Reporter reporter) throws IOException { 462 writer.close(reporter); 463 } 464 } 465 466 /** 467 * Gets the output collector for a named output. 468 * <p/> 469 * 470 * @param namedOutput the named output name 471 * @param reporter the reporter 472 * @return the output collector for the given named output 473 * @throws IOException thrown if output collector could not be created 474 */ 475 @SuppressWarnings({"unchecked"}) 476 public OutputCollector getCollector(String namedOutput, Reporter reporter) 477 throws IOException { 478 return getCollector(namedOutput, null, reporter); 479 } 480 481 /** 482 * Gets the output collector for a multi named output. 483 * <p/> 484 * 485 * @param namedOutput the named output name 486 * @param multiName the multi name part 487 * @param reporter the reporter 488 * @return the output collector for the given named output 489 * @throws IOException thrown if output collector could not be created 490 */ 491 @SuppressWarnings({"unchecked"}) 492 public OutputCollector getCollector(String namedOutput, String multiName, 493 Reporter reporter) 494 throws IOException { 495 496 checkNamedOutputName(namedOutput); 497 if (!namedOutputs.contains(namedOutput)) { 498 throw new IllegalArgumentException("Undefined named output '" + 499 namedOutput + "'"); 500 } 501 boolean multi = isMultiNamedOutput(conf, namedOutput); 502 503 if (!multi && multiName != null) { 504 throw new IllegalArgumentException("Name output '" + namedOutput + 505 "' has not been defined as multi"); 506 } 507 if (multi) { 508 checkTokenName(multiName); 509 } 510 511 String baseFileName = (multi) ? namedOutput + "_" + multiName : namedOutput; 512 513 final RecordWriter writer = 514 getRecordWriter(namedOutput, baseFileName, reporter); 515 516 return new OutputCollector() { 517 518 @SuppressWarnings({"unchecked"}) 519 public void collect(Object key, Object value) throws IOException { 520 writer.write(key, value); 521 } 522 523 }; 524 } 525 526 /** 527 * Closes all the opened named outputs. 528 * <p/> 529 * If overriden subclasses must invoke <code>super.close()</code> at the 530 * end of their <code>close()</code> 531 * 532 * @throws java.io.IOException thrown if any of the MultipleOutput files 533 * could not be closed properly. 534 */ 535 public void close() throws IOException { 536 for (RecordWriter writer : recordWriters.values()) { 537 writer.close(null); 538 } 539 } 540 541 private static class InternalFileOutputFormat extends 542 FileOutputFormat<Object, Object> { 543 544 public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput"; 545 546 @SuppressWarnings({"unchecked"}) 547 public RecordWriter<Object, Object> getRecordWriter( 548 FileSystem fs, JobConf job, String baseFileName, Progressable progress) 549 throws IOException { 550 551 String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null); 552 String fileName = getUniqueName(job, baseFileName); 553 554 // The following trick leverages the instantiation of a record writer via 555 // the job conf thus supporting arbitrary output formats. 556 JobConf outputConf = new JobConf(job); 557 outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput)); 558 outputConf.setOutputKeyClass(getNamedOutputKeyClass(job, nameOutput)); 559 outputConf.setOutputValueClass(getNamedOutputValueClass(job, nameOutput)); 560 OutputFormat outputFormat = outputConf.getOutputFormat(); 561 return outputFormat.getRecordWriter(fs, outputConf, fileName, progress); 562 } 563 } 564 565}