001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapreduce.lib.output; 019 020import org.apache.hadoop.classification.InterfaceAudience; 021import org.apache.hadoop.classification.InterfaceStability; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.mapreduce.*; 024import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; 025import org.apache.hadoop.util.ReflectionUtils; 026 027import java.io.IOException; 028import java.util.*; 029 030/** 031 * The MultipleOutputs class simplifies writing output data 032 * to multiple outputs 033 * 034 * <p> 035 * Case one: writing to additional outputs other than the job default output. 036 * 037 * Each additional output, or named output, may be configured with its own 038 * <code>OutputFormat</code>, with its own key class and with its own value 039 * class. 040 * 041 * <p> 042 * Case two: to write data to different files provided by user 043 * </p> 044 * 045 * <p> 046 * MultipleOutputs supports counters, by default they are disabled. The 047 * counters group is the {@link MultipleOutputs} class name. The names of the 048 * counters are the same as the output name. These count the number records 049 * written to each output name. 050 * </p> 051 * 052 * Usage pattern for job submission: 053 * <pre> 054 * 055 * Job job = new Job(); 056 * 057 * FileInputFormat.setInputPath(job, inDir); 058 * FileOutputFormat.setOutputPath(job, outDir); 059 * 060 * job.setMapperClass(MOMap.class); 061 * job.setReducerClass(MOReduce.class); 062 * ... 063 * 064 * // Defines additional single text based output 'text' for the job 065 * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, 066 * LongWritable.class, Text.class); 067 * 068 * // Defines additional sequence-file based output 'sequence' for the job 069 * MultipleOutputs.addNamedOutput(job, "seq", 070 * SequenceFileOutputFormat.class, 071 * LongWritable.class, Text.class); 072 * ... 073 * 074 * job.waitForCompletion(true); 075 * ... 076 * </pre> 077 * <p> 078 * Usage in Reducer: 079 * <pre> 080 * <K, V> String generateFileName(K k, V v) { 081 * return k.toString() + "_" + v.toString(); 082 * } 083 * 084 * public class MOReduce extends 085 * Reducer<WritableComparable, Writable,WritableComparable, Writable> { 086 * private MultipleOutputs mos; 087 * public void setup(Context context) { 088 * ... 089 * mos = new MultipleOutputs(context); 090 * } 091 * 092 * public void reduce(WritableComparable key, Iterator<Writable> values, 093 * Context context) 094 * throws IOException { 095 * ... 096 * mos.write("text", , key, new Text("Hello")); 097 * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a"); 098 * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b"); 099 * mos.write(key, new Text("value"), generateFileName(key, new Text("value"))); 100 * ... 101 * } 102 * 103 * public void cleanup(Context) throws IOException { 104 * mos.close(); 105 * ... 106 * } 107 * 108 * } 109 * </pre> 110 */ 111@InterfaceAudience.Public 112@InterfaceStability.Stable 113public class MultipleOutputs<KEYOUT, VALUEOUT> { 114 115 private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs"; 116 117 private static final String MO_PREFIX = 118 "mapreduce.multipleoutputs.namedOutput."; 119 120 private static final String FORMAT = ".format"; 121 private static final String KEY = ".key"; 122 private static final String VALUE = ".value"; 123 private static final String COUNTERS_ENABLED = 124 "mapreduce.multipleoutputs.counters"; 125 126 /** 127 * Counters group used by the counters of MultipleOutputs. 128 */ 129 private static final String COUNTERS_GROUP = MultipleOutputs.class.getName(); 130 131 /** 132 * Cache for the taskContexts 133 */ 134 private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>(); 135 /** 136 * Cached TaskAttemptContext which uses the job's configured settings 137 */ 138 private TaskAttemptContext jobOutputFormatContext; 139 140 /** 141 * Checks if a named output name is valid token. 142 * 143 * @param namedOutput named output Name 144 * @throws IllegalArgumentException if the output name is not valid. 145 */ 146 private static void checkTokenName(String namedOutput) { 147 if (namedOutput == null || namedOutput.length() == 0) { 148 throw new IllegalArgumentException( 149 "Name cannot be NULL or emtpy"); 150 } 151 for (char ch : namedOutput.toCharArray()) { 152 if ((ch >= 'A') && (ch <= 'Z')) { 153 continue; 154 } 155 if ((ch >= 'a') && (ch <= 'z')) { 156 continue; 157 } 158 if ((ch >= '0') && (ch <= '9')) { 159 continue; 160 } 161 throw new IllegalArgumentException( 162 "Name cannot be have a '" + ch + "' char"); 163 } 164 } 165 166 /** 167 * Checks if output name is valid. 168 * 169 * name cannot be the name used for the default output 170 * @param outputPath base output Name 171 * @throws IllegalArgumentException if the output name is not valid. 172 */ 173 private static void checkBaseOutputPath(String outputPath) { 174 if (outputPath.equals(FileOutputFormat.PART)) { 175 throw new IllegalArgumentException("output name cannot be 'part'"); 176 } 177 } 178 179 /** 180 * Checks if a named output name is valid. 181 * 182 * @param namedOutput named output Name 183 * @throws IllegalArgumentException if the output name is not valid. 184 */ 185 private static void checkNamedOutputName(JobContext job, 186 String namedOutput, boolean alreadyDefined) { 187 checkTokenName(namedOutput); 188 checkBaseOutputPath(namedOutput); 189 List<String> definedChannels = getNamedOutputsList(job); 190 if (alreadyDefined && definedChannels.contains(namedOutput)) { 191 throw new IllegalArgumentException("Named output '" + namedOutput + 192 "' already alreadyDefined"); 193 } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) { 194 throw new IllegalArgumentException("Named output '" + namedOutput + 195 "' not defined"); 196 } 197 } 198 199 // Returns list of channel names. 200 private static List<String> getNamedOutputsList(JobContext job) { 201 List<String> names = new ArrayList<String>(); 202 StringTokenizer st = new StringTokenizer( 203 job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " "); 204 while (st.hasMoreTokens()) { 205 names.add(st.nextToken()); 206 } 207 return names; 208 } 209 210 // Returns the named output OutputFormat. 211 @SuppressWarnings("unchecked") 212 private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass( 213 JobContext job, String namedOutput) { 214 return (Class<? extends OutputFormat<?, ?>>) 215 job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null, 216 OutputFormat.class); 217 } 218 219 // Returns the key class for a named output. 220 private static Class<?> getNamedOutputKeyClass(JobContext job, 221 String namedOutput) { 222 return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null, 223 Object.class); 224 } 225 226 // Returns the value class for a named output. 227 private static Class<?> getNamedOutputValueClass( 228 JobContext job, String namedOutput) { 229 return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE, 230 null, Object.class); 231 } 232 233 /** 234 * Adds a named output for the job. 235 * <p/> 236 * 237 * @param job job to add the named output 238 * @param namedOutput named output name, it has to be a word, letters 239 * and numbers only, cannot be the word 'part' as 240 * that is reserved for the default output. 241 * @param outputFormatClass OutputFormat class. 242 * @param keyClass key class 243 * @param valueClass value class 244 */ 245 @SuppressWarnings("unchecked") 246 public static void addNamedOutput(Job job, String namedOutput, 247 Class<? extends OutputFormat> outputFormatClass, 248 Class<?> keyClass, Class<?> valueClass) { 249 checkNamedOutputName(job, namedOutput, true); 250 Configuration conf = job.getConfiguration(); 251 conf.set(MULTIPLE_OUTPUTS, 252 conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput); 253 conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, 254 OutputFormat.class); 255 conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class); 256 conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class); 257 } 258 259 /** 260 * Enables or disables counters for the named outputs. 261 * 262 * The counters group is the {@link MultipleOutputs} class name. 263 * The names of the counters are the same as the named outputs. These 264 * counters count the number records written to each output name. 265 * By default these counters are disabled. 266 * 267 * @param job job to enable counters 268 * @param enabled indicates if the counters will be enabled or not. 269 */ 270 public static void setCountersEnabled(Job job, boolean enabled) { 271 job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled); 272 } 273 274 /** 275 * Returns if the counters for the named outputs are enabled or not. 276 * By default these counters are disabled. 277 * 278 * @param job the job 279 * @return TRUE if the counters are enabled, FALSE if they are disabled. 280 */ 281 public static boolean getCountersEnabled(JobContext job) { 282 return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false); 283 } 284 285 /** 286 * Wraps RecordWriter to increment counters. 287 */ 288 @SuppressWarnings("unchecked") 289 private static class RecordWriterWithCounter extends RecordWriter { 290 private RecordWriter writer; 291 private String counterName; 292 private TaskInputOutputContext context; 293 294 public RecordWriterWithCounter(RecordWriter writer, String counterName, 295 TaskInputOutputContext context) { 296 this.writer = writer; 297 this.counterName = counterName; 298 this.context = context; 299 } 300 301 @SuppressWarnings({"unchecked"}) 302 public void write(Object key, Object value) 303 throws IOException, InterruptedException { 304 context.getCounter(COUNTERS_GROUP, counterName).increment(1); 305 writer.write(key, value); 306 } 307 308 public void close(TaskAttemptContext context) 309 throws IOException, InterruptedException { 310 writer.close(context); 311 } 312 } 313 314 // instance code, to be used from Mapper/Reducer code 315 316 private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context; 317 private Set<String> namedOutputs; 318 private Map<String, RecordWriter<?, ?>> recordWriters; 319 private boolean countersEnabled; 320 321 /** 322 * Creates and initializes multiple outputs support, 323 * it should be instantiated in the Mapper/Reducer setup method. 324 * 325 * @param context the TaskInputOutputContext object 326 */ 327 public MultipleOutputs( 328 TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) { 329 this.context = context; 330 namedOutputs = Collections.unmodifiableSet( 331 new HashSet<String>(MultipleOutputs.getNamedOutputsList(context))); 332 recordWriters = new HashMap<String, RecordWriter<?, ?>>(); 333 countersEnabled = getCountersEnabled(context); 334 } 335 336 /** 337 * Write key and value to the namedOutput. 338 * 339 * Output path is a unique file generated for the namedOutput. 340 * For example, {namedOutput}-(m|r)-{part-number} 341 * 342 * @param namedOutput the named output name 343 * @param key the key 344 * @param value the value 345 */ 346 @SuppressWarnings("unchecked") 347 public <K, V> void write(String namedOutput, K key, V value) 348 throws IOException, InterruptedException { 349 write(namedOutput, key, value, namedOutput); 350 } 351 352 /** 353 * Write key and value to baseOutputPath using the namedOutput. 354 * 355 * @param namedOutput the named output name 356 * @param key the key 357 * @param value the value 358 * @param baseOutputPath base-output path to write the record to. 359 * Note: Framework will generate unique filename for the baseOutputPath 360 */ 361 @SuppressWarnings("unchecked") 362 public <K, V> void write(String namedOutput, K key, V value, 363 String baseOutputPath) throws IOException, InterruptedException { 364 checkNamedOutputName(context, namedOutput, false); 365 checkBaseOutputPath(baseOutputPath); 366 if (!namedOutputs.contains(namedOutput)) { 367 throw new IllegalArgumentException("Undefined named output '" + 368 namedOutput + "'"); 369 } 370 TaskAttemptContext taskContext = getContext(namedOutput); 371 getRecordWriter(taskContext, baseOutputPath).write(key, value); 372 } 373 374 /** 375 * Write key value to an output file name. 376 * 377 * Gets the record writer from job's output format. 378 * Job's output format should be a FileOutputFormat. 379 * 380 * @param key the key 381 * @param value the value 382 * @param baseOutputPath base-output path to write the record to. 383 * Note: Framework will generate unique filename for the baseOutputPath 384 */ 385 @SuppressWarnings("unchecked") 386 public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 387 throws IOException, InterruptedException { 388 checkBaseOutputPath(baseOutputPath); 389 if (jobOutputFormatContext == null) { 390 jobOutputFormatContext = 391 new TaskAttemptContextImpl(context.getConfiguration(), 392 context.getTaskAttemptID(), 393 new WrappedStatusReporter(context)); 394 } 395 getRecordWriter(jobOutputFormatContext, baseOutputPath).write(key, value); 396 } 397 398 // by being synchronized MultipleOutputTask can be use with a 399 // MultithreadedMapper. 400 @SuppressWarnings("unchecked") 401 private synchronized RecordWriter getRecordWriter( 402 TaskAttemptContext taskContext, String baseFileName) 403 throws IOException, InterruptedException { 404 405 // look for record-writer in the cache 406 RecordWriter writer = recordWriters.get(baseFileName); 407 408 // If not in cache, create a new one 409 if (writer == null) { 410 // get the record writer from context output format 411 FileOutputFormat.setOutputName(taskContext, baseFileName); 412 try { 413 writer = ((OutputFormat) ReflectionUtils.newInstance( 414 taskContext.getOutputFormatClass(), taskContext.getConfiguration())) 415 .getRecordWriter(taskContext); 416 } catch (ClassNotFoundException e) { 417 throw new IOException(e); 418 } 419 420 // if counters are enabled, wrap the writer with context 421 // to increment counters 422 if (countersEnabled) { 423 writer = new RecordWriterWithCounter(writer, baseFileName, context); 424 } 425 426 // add the record-writer to the cache 427 recordWriters.put(baseFileName, writer); 428 } 429 return writer; 430 } 431 432 // Create a taskAttemptContext for the named output with 433 // output format and output key/value types put in the context 434 private TaskAttemptContext getContext(String nameOutput) throws IOException { 435 436 TaskAttemptContext taskContext = taskContexts.get(nameOutput); 437 438 if (taskContext != null) { 439 return taskContext; 440 } 441 442 // The following trick leverages the instantiation of a record writer via 443 // the job thus supporting arbitrary output formats. 444 Job job = new Job(context.getConfiguration()); 445 job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput)); 446 job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput)); 447 job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput)); 448 taskContext = new TaskAttemptContextImpl(job.getConfiguration(), context 449 .getTaskAttemptID(), new WrappedStatusReporter(context)); 450 451 taskContexts.put(nameOutput, taskContext); 452 453 return taskContext; 454 } 455 456 private static class WrappedStatusReporter extends StatusReporter { 457 458 TaskAttemptContext context; 459 460 public WrappedStatusReporter(TaskAttemptContext context) { 461 this.context = context; 462 } 463 464 @Override 465 public Counter getCounter(Enum<?> name) { 466 return context.getCounter(name); 467 } 468 469 @Override 470 public Counter getCounter(String group, String name) { 471 return context.getCounter(group, name); 472 } 473 474 @Override 475 public void progress() { 476 context.progress(); 477 } 478 479 @Override 480 public float getProgress() { 481 return context.getProgress(); 482 } 483 484 @Override 485 public void setStatus(String status) { 486 context.setStatus(status); 487 } 488 } 489 490 /** 491 * Closes all the opened outputs. 492 * 493 * This should be called from cleanup method of map/reduce task. 494 * If overridden subclasses must invoke <code>super.close()</code> at the 495 * end of their <code>close()</code> 496 * 497 */ 498 @SuppressWarnings("unchecked") 499 public void close() throws IOException, InterruptedException { 500 for (RecordWriter writer : recordWriters.values()) { 501 writer.close(context); 502 } 503 } 504}