001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.output;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.mapreduce.Job;
027 import org.apache.hadoop.mapreduce.JobContext;
028 import org.apache.hadoop.mapreduce.OutputCommitter;
029 import org.apache.hadoop.mapreduce.OutputFormat;
030 import org.apache.hadoop.mapreduce.RecordWriter;
031 import org.apache.hadoop.mapreduce.TaskAttemptContext;
032 import org.apache.hadoop.util.ReflectionUtils;
033
034 /**
035 * A Convenience class that creates output lazily.
036 */
037 @InterfaceAudience.Public
038 @InterfaceStability.Stable
039 public class LazyOutputFormat <K,V> extends FilterOutputFormat<K, V> {
040 public static String OUTPUT_FORMAT =
041 "mapreduce.output.lazyoutputformat.outputformat";
042 /**
043 * Set the underlying output format for LazyOutputFormat.
044 * @param job the {@link Job} to modify
045 * @param theClass the underlying class
046 */
047 @SuppressWarnings("unchecked")
048 public static void setOutputFormatClass(Job job,
049 Class<? extends OutputFormat> theClass) {
050 job.setOutputFormatClass(LazyOutputFormat.class);
051 job.getConfiguration().setClass(OUTPUT_FORMAT,
052 theClass, OutputFormat.class);
053 }
054
055 @SuppressWarnings("unchecked")
056 private void getBaseOutputFormat(Configuration conf)
057 throws IOException {
058 baseOut = ((OutputFormat<K, V>) ReflectionUtils.newInstance(
059 conf.getClass(OUTPUT_FORMAT, null), conf));
060 if (baseOut == null) {
061 throw new IOException("Output Format not set for LazyOutputFormat");
062 }
063 }
064
065 @Override
066 public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
067 throws IOException, InterruptedException {
068 if (baseOut == null) {
069 getBaseOutputFormat(context.getConfiguration());
070 }
071 return new LazyRecordWriter<K, V>(baseOut, context);
072 }
073
074 @Override
075 public void checkOutputSpecs(JobContext context)
076 throws IOException, InterruptedException {
077 if (baseOut == null) {
078 getBaseOutputFormat(context.getConfiguration());
079 }
080 super.checkOutputSpecs(context);
081 }
082
083 @Override
084 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
085 throws IOException, InterruptedException {
086 if (baseOut == null) {
087 getBaseOutputFormat(context.getConfiguration());
088 }
089 return super.getOutputCommitter(context);
090 }
091
092 /**
093 * A convenience class to be used with LazyOutputFormat
094 */
095 private static class LazyRecordWriter<K,V> extends FilterRecordWriter<K,V> {
096
097 final OutputFormat<K,V> outputFormat;
098 final TaskAttemptContext taskContext;
099
100 public LazyRecordWriter(OutputFormat<K,V> out,
101 TaskAttemptContext taskContext)
102 throws IOException, InterruptedException {
103 this.outputFormat = out;
104 this.taskContext = taskContext;
105 }
106
107 @Override
108 public void write(K key, V value) throws IOException, InterruptedException {
109 if (rawWriter == null) {
110 rawWriter = outputFormat.getRecordWriter(taskContext);
111 }
112 rawWriter.write(key, value);
113 }
114
115 @Override
116 public void close(TaskAttemptContext context)
117 throws IOException, InterruptedException {
118 if (rawWriter != null) {
119 rawWriter.close(context);
120 }
121 }
122
123 }
124 }