001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.output;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.mapreduce.Job;
027import org.apache.hadoop.mapreduce.JobContext;
028import org.apache.hadoop.mapreduce.OutputCommitter;
029import org.apache.hadoop.mapreduce.OutputFormat;
030import org.apache.hadoop.mapreduce.RecordWriter;
031import org.apache.hadoop.mapreduce.TaskAttemptContext;
032import org.apache.hadoop.util.ReflectionUtils;
033
034/**
035 * A Convenience class that creates output lazily.  
036 */
037@InterfaceAudience.Public
038@InterfaceStability.Stable
039public class LazyOutputFormat <K,V> extends FilterOutputFormat<K, V> {
040  public static String OUTPUT_FORMAT = 
041    "mapreduce.output.lazyoutputformat.outputformat";
042  /**
043   * Set the underlying output format for LazyOutputFormat.
044   * @param job the {@link Job} to modify
045   * @param theClass the underlying class
046   */
047  @SuppressWarnings("unchecked")
048  public static void  setOutputFormatClass(Job job, 
049                                     Class<? extends OutputFormat> theClass) {
050      job.setOutputFormatClass(LazyOutputFormat.class);
051      job.getConfiguration().setClass(OUTPUT_FORMAT, 
052          theClass, OutputFormat.class);
053  }
054
055  @SuppressWarnings("unchecked")
056  private void getBaseOutputFormat(Configuration conf) 
057  throws IOException {
058    baseOut =  ((OutputFormat<K, V>) ReflectionUtils.newInstance(
059      conf.getClass(OUTPUT_FORMAT, null), conf));
060    if (baseOut == null) {
061      throw new IOException("Output Format not set for LazyOutputFormat");
062    }
063  }
064
065  @Override
066  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
067  throws IOException, InterruptedException {
068    if (baseOut == null) {
069      getBaseOutputFormat(context.getConfiguration());
070    }
071    return new LazyRecordWriter<K, V>(baseOut, context);
072  }
073  
074  @Override
075  public void checkOutputSpecs(JobContext context) 
076  throws IOException, InterruptedException {
077    if (baseOut == null) {
078      getBaseOutputFormat(context.getConfiguration());
079    }
080   super.checkOutputSpecs(context);
081  }
082  
083  @Override
084  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
085  throws IOException, InterruptedException {
086    if (baseOut == null) {
087      getBaseOutputFormat(context.getConfiguration());
088    }
089    return super.getOutputCommitter(context);
090  }
091  
092  /**
093   * A convenience class to be used with LazyOutputFormat
094   */
095  private static class LazyRecordWriter<K,V> extends FilterRecordWriter<K,V> {
096
097    final OutputFormat<K,V> outputFormat;
098    final TaskAttemptContext taskContext;
099
100    public LazyRecordWriter(OutputFormat<K,V> out, 
101                            TaskAttemptContext taskContext)
102    throws IOException, InterruptedException {
103      this.outputFormat = out;
104      this.taskContext = taskContext;
105    }
106
107    @Override
108    public void write(K key, V value) throws IOException, InterruptedException {
109      if (rawWriter == null) {
110        rawWriter = outputFormat.getRecordWriter(taskContext);
111      }
112      rawWriter.write(key, value);
113    }
114
115    @Override
116    public void close(TaskAttemptContext context) 
117    throws IOException, InterruptedException {
118      if (rawWriter != null) {
119        rawWriter.close(context);
120      }
121    }
122
123  }
124}