001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.output;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.mapreduce.Job;
027import org.apache.hadoop.mapreduce.JobContext;
028import org.apache.hadoop.mapreduce.OutputCommitter;
029import org.apache.hadoop.mapreduce.OutputFormat;
030import org.apache.hadoop.mapreduce.RecordWriter;
031import org.apache.hadoop.mapreduce.TaskAttemptContext;
032import org.apache.hadoop.util.ReflectionUtils;
033
034/**
035 * A Convenience class that creates output lazily.
036 * Use in conjuction with org.apache.hadoop.mapreduce.lib.output.MultipleOutputs to recreate the
037 * behaviour of org.apache.hadoop.mapred.lib.MultipleTextOutputFormat (etc) of the old Hadoop API.
038 * See {@link MultipleOutputs} documentation for more information.
039 */
040@InterfaceAudience.Public
041@InterfaceStability.Stable
042public class LazyOutputFormat <K,V> extends FilterOutputFormat<K, V> {
043  public static String OUTPUT_FORMAT = 
044    "mapreduce.output.lazyoutputformat.outputformat";
045  /**
046   * Set the underlying output format for LazyOutputFormat.
047   * @param job the {@link Job} to modify
048   * @param theClass the underlying class
049   */
050  @SuppressWarnings("unchecked")
051  public static void  setOutputFormatClass(Job job, 
052                                     Class<? extends OutputFormat> theClass) {
053      job.setOutputFormatClass(LazyOutputFormat.class);
054      job.getConfiguration().setClass(OUTPUT_FORMAT, 
055          theClass, OutputFormat.class);
056  }
057
058  @SuppressWarnings("unchecked")
059  private void getBaseOutputFormat(Configuration conf) 
060  throws IOException {
061    baseOut =  ((OutputFormat<K, V>) ReflectionUtils.newInstance(
062      conf.getClass(OUTPUT_FORMAT, null), conf));
063    if (baseOut == null) {
064      throw new IOException("Output Format not set for LazyOutputFormat");
065    }
066  }
067
068  @Override
069  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
070  throws IOException, InterruptedException {
071    if (baseOut == null) {
072      getBaseOutputFormat(context.getConfiguration());
073    }
074    return new LazyRecordWriter<K, V>(baseOut, context);
075  }
076  
077  @Override
078  public void checkOutputSpecs(JobContext context) 
079  throws IOException, InterruptedException {
080    if (baseOut == null) {
081      getBaseOutputFormat(context.getConfiguration());
082    }
083   super.checkOutputSpecs(context);
084  }
085  
086  @Override
087  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
088  throws IOException, InterruptedException {
089    if (baseOut == null) {
090      getBaseOutputFormat(context.getConfiguration());
091    }
092    return super.getOutputCommitter(context);
093  }
094  
095  /**
096   * A convenience class to be used with LazyOutputFormat
097   */
098  private static class LazyRecordWriter<K,V> extends FilterRecordWriter<K,V> {
099
100    final OutputFormat<K,V> outputFormat;
101    final TaskAttemptContext taskContext;
102
103    public LazyRecordWriter(OutputFormat<K,V> out, 
104                            TaskAttemptContext taskContext)
105    throws IOException, InterruptedException {
106      this.outputFormat = out;
107      this.taskContext = taskContext;
108    }
109
110    @Override
111    public void write(K key, V value) throws IOException, InterruptedException {
112      if (rawWriter == null) {
113        rawWriter = outputFormat.getRecordWriter(taskContext);
114      }
115      rawWriter.write(key, value);
116    }
117
118    @Override
119    public void close(TaskAttemptContext context) 
120    throws IOException, InterruptedException {
121      if (rawWriter != null) {
122        rawWriter.close(context);
123      }
124    }
125
126  }
127}