001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.output;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.mapreduce.Job;
027    import org.apache.hadoop.mapreduce.JobContext;
028    import org.apache.hadoop.mapreduce.OutputCommitter;
029    import org.apache.hadoop.mapreduce.OutputFormat;
030    import org.apache.hadoop.mapreduce.RecordWriter;
031    import org.apache.hadoop.mapreduce.TaskAttemptContext;
032    import org.apache.hadoop.util.ReflectionUtils;
033    
034    /**
035     * A Convenience class that creates output lazily.  
036     */
037    @InterfaceAudience.Public
038    @InterfaceStability.Stable
039    public class LazyOutputFormat <K,V> extends FilterOutputFormat<K, V> {
040      public static String OUTPUT_FORMAT = 
041        "mapreduce.output.lazyoutputformat.outputformat";
042      /**
043       * Set the underlying output format for LazyOutputFormat.
044       * @param job the {@link Job} to modify
045       * @param theClass the underlying class
046       */
047      @SuppressWarnings("unchecked")
048      public static void  setOutputFormatClass(Job job, 
049                                         Class<? extends OutputFormat> theClass) {
050          job.setOutputFormatClass(LazyOutputFormat.class);
051          job.getConfiguration().setClass(OUTPUT_FORMAT, 
052              theClass, OutputFormat.class);
053      }
054    
055      @SuppressWarnings("unchecked")
056      private void getBaseOutputFormat(Configuration conf) 
057      throws IOException {
058        baseOut =  ((OutputFormat<K, V>) ReflectionUtils.newInstance(
059          conf.getClass(OUTPUT_FORMAT, null), conf));
060        if (baseOut == null) {
061          throw new IOException("Output Format not set for LazyOutputFormat");
062        }
063      }
064    
065      @Override
066      public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
067      throws IOException, InterruptedException {
068        if (baseOut == null) {
069          getBaseOutputFormat(context.getConfiguration());
070        }
071        return new LazyRecordWriter<K, V>(baseOut, context);
072      }
073      
074      @Override
075      public void checkOutputSpecs(JobContext context) 
076      throws IOException, InterruptedException {
077        if (baseOut == null) {
078          getBaseOutputFormat(context.getConfiguration());
079        }
080       super.checkOutputSpecs(context);
081      }
082      
083      @Override
084      public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
085      throws IOException, InterruptedException {
086        if (baseOut == null) {
087          getBaseOutputFormat(context.getConfiguration());
088        }
089        return super.getOutputCommitter(context);
090      }
091      
092      /**
093       * A convenience class to be used with LazyOutputFormat
094       */
095      private static class LazyRecordWriter<K,V> extends FilterRecordWriter<K,V> {
096    
097        final OutputFormat<K,V> outputFormat;
098        final TaskAttemptContext taskContext;
099    
100        public LazyRecordWriter(OutputFormat<K,V> out, 
101                                TaskAttemptContext taskContext)
102        throws IOException, InterruptedException {
103          this.outputFormat = out;
104          this.taskContext = taskContext;
105        }
106    
107        @Override
108        public void write(K key, V value) throws IOException, InterruptedException {
109          if (rawWriter == null) {
110            rawWriter = outputFormat.getRecordWriter(taskContext);
111          }
112          rawWriter.write(key, value);
113        }
114    
115        @Override
116        public void close(TaskAttemptContext context) 
117        throws IOException, InterruptedException {
118          if (rawWriter != null) {
119            rawWriter.close(context);
120          }
121        }
122    
123      }
124    }