001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.output; 020 021import java.io.IOException; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.mapreduce.Job; 027import org.apache.hadoop.mapreduce.JobContext; 028import org.apache.hadoop.mapreduce.OutputCommitter; 029import org.apache.hadoop.mapreduce.OutputFormat; 030import org.apache.hadoop.mapreduce.RecordWriter; 031import org.apache.hadoop.mapreduce.TaskAttemptContext; 032import org.apache.hadoop.util.ReflectionUtils; 033 034/** 035 * A Convenience class that creates output lazily. 036 */ 037@InterfaceAudience.Public 038@InterfaceStability.Stable 039public class LazyOutputFormat <K,V> extends FilterOutputFormat<K, V> { 040 public static String OUTPUT_FORMAT = 041 "mapreduce.output.lazyoutputformat.outputformat"; 042 /** 043 * Set the underlying output format for LazyOutputFormat. 044 * @param job the {@link Job} to modify 045 * @param theClass the underlying class 046 */ 047 @SuppressWarnings("unchecked") 048 public static void setOutputFormatClass(Job job, 049 Class<? extends OutputFormat> theClass) { 050 job.setOutputFormatClass(LazyOutputFormat.class); 051 job.getConfiguration().setClass(OUTPUT_FORMAT, 052 theClass, OutputFormat.class); 053 } 054 055 @SuppressWarnings("unchecked") 056 private void getBaseOutputFormat(Configuration conf) 057 throws IOException { 058 baseOut = ((OutputFormat<K, V>) ReflectionUtils.newInstance( 059 conf.getClass(OUTPUT_FORMAT, null), conf)); 060 if (baseOut == null) { 061 throw new IOException("Output Format not set for LazyOutputFormat"); 062 } 063 } 064 065 @Override 066 public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) 067 throws IOException, InterruptedException { 068 if (baseOut == null) { 069 getBaseOutputFormat(context.getConfiguration()); 070 } 071 return new LazyRecordWriter<K, V>(baseOut, context); 072 } 073 074 @Override 075 public void checkOutputSpecs(JobContext context) 076 throws IOException, InterruptedException { 077 if (baseOut == null) { 078 getBaseOutputFormat(context.getConfiguration()); 079 } 080 super.checkOutputSpecs(context); 081 } 082 083 @Override 084 public OutputCommitter getOutputCommitter(TaskAttemptContext context) 085 throws IOException, InterruptedException { 086 if (baseOut == null) { 087 getBaseOutputFormat(context.getConfiguration()); 088 } 089 return super.getOutputCommitter(context); 090 } 091 092 /** 093 * A convenience class to be used with LazyOutputFormat 094 */ 095 private static class LazyRecordWriter<K,V> extends FilterRecordWriter<K,V> { 096 097 final OutputFormat<K,V> outputFormat; 098 final TaskAttemptContext taskContext; 099 100 public LazyRecordWriter(OutputFormat<K,V> out, 101 TaskAttemptContext taskContext) 102 throws IOException, InterruptedException { 103 this.outputFormat = out; 104 this.taskContext = taskContext; 105 } 106 107 @Override 108 public void write(K key, V value) throws IOException, InterruptedException { 109 if (rawWriter == null) { 110 rawWriter = outputFormat.getRecordWriter(taskContext); 111 } 112 rawWriter.write(key, value); 113 } 114 115 @Override 116 public void close(TaskAttemptContext context) 117 throws IOException, InterruptedException { 118 if (rawWriter != null) { 119 rawWriter.close(context); 120 } 121 } 122 123 } 124}