001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.output; 020 021import java.io.IOException; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.mapreduce.Job; 027import org.apache.hadoop.mapreduce.JobContext; 028import org.apache.hadoop.mapreduce.OutputCommitter; 029import org.apache.hadoop.mapreduce.OutputFormat; 030import org.apache.hadoop.mapreduce.RecordWriter; 031import org.apache.hadoop.mapreduce.TaskAttemptContext; 032import org.apache.hadoop.util.ReflectionUtils; 033 034/** 035 * A Convenience class that creates output lazily. 036 * Use in conjuction with org.apache.hadoop.mapreduce.lib.output.MultipleOutputs to recreate the 037 * behaviour of org.apache.hadoop.mapred.lib.MultipleTextOutputFormat (etc) of the old Hadoop API. 038 * See {@link MultipleOutputs} documentation for more information. 039 */ 040@InterfaceAudience.Public 041@InterfaceStability.Stable 042public class LazyOutputFormat <K,V> extends FilterOutputFormat<K, V> { 043 public static String OUTPUT_FORMAT = 044 "mapreduce.output.lazyoutputformat.outputformat"; 045 /** 046 * Set the underlying output format for LazyOutputFormat. 047 * @param job the {@link Job} to modify 048 * @param theClass the underlying class 049 */ 050 @SuppressWarnings("unchecked") 051 public static void setOutputFormatClass(Job job, 052 Class<? extends OutputFormat> theClass) { 053 job.setOutputFormatClass(LazyOutputFormat.class); 054 job.getConfiguration().setClass(OUTPUT_FORMAT, 055 theClass, OutputFormat.class); 056 } 057 058 @SuppressWarnings("unchecked") 059 private void getBaseOutputFormat(Configuration conf) 060 throws IOException { 061 baseOut = ((OutputFormat<K, V>) ReflectionUtils.newInstance( 062 conf.getClass(OUTPUT_FORMAT, null), conf)); 063 if (baseOut == null) { 064 throw new IOException("Output Format not set for LazyOutputFormat"); 065 } 066 } 067 068 @Override 069 public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) 070 throws IOException, InterruptedException { 071 if (baseOut == null) { 072 getBaseOutputFormat(context.getConfiguration()); 073 } 074 return new LazyRecordWriter<K, V>(baseOut, context); 075 } 076 077 @Override 078 public void checkOutputSpecs(JobContext context) 079 throws IOException, InterruptedException { 080 if (baseOut == null) { 081 getBaseOutputFormat(context.getConfiguration()); 082 } 083 super.checkOutputSpecs(context); 084 } 085 086 @Override 087 public OutputCommitter getOutputCommitter(TaskAttemptContext context) 088 throws IOException, InterruptedException { 089 if (baseOut == null) { 090 getBaseOutputFormat(context.getConfiguration()); 091 } 092 return super.getOutputCommitter(context); 093 } 094 095 /** 096 * A convenience class to be used with LazyOutputFormat 097 */ 098 private static class LazyRecordWriter<K,V> extends FilterRecordWriter<K,V> { 099 100 final OutputFormat<K,V> outputFormat; 101 final TaskAttemptContext taskContext; 102 103 public LazyRecordWriter(OutputFormat<K,V> out, 104 TaskAttemptContext taskContext) 105 throws IOException, InterruptedException { 106 this.outputFormat = out; 107 this.taskContext = taskContext; 108 } 109 110 @Override 111 public void write(K key, V value) throws IOException, InterruptedException { 112 if (rawWriter == null) { 113 rawWriter = outputFormat.getRecordWriter(taskContext); 114 } 115 rawWriter.write(key, value); 116 } 117 118 @Override 119 public void close(TaskAttemptContext context) 120 throws IOException, InterruptedException { 121 if (rawWriter != null) { 122 rawWriter.close(context); 123 } 124 } 125 126 } 127}