001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.output;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.mapreduce.Job;
027    import org.apache.hadoop.mapreduce.JobContext;
028    import org.apache.hadoop.mapreduce.OutputCommitter;
029    import org.apache.hadoop.mapreduce.OutputFormat;
030    import org.apache.hadoop.mapreduce.RecordWriter;
031    import org.apache.hadoop.mapreduce.TaskAttemptContext;
032    import org.apache.hadoop.util.ReflectionUtils;
033    
034    /**
035     * A Convenience class that creates output lazily.
036     * Use in conjuction with org.apache.hadoop.mapreduce.lib.output.MultipleOutputs to recreate the
037     * behaviour of org.apache.hadoop.mapred.lib.MultipleTextOutputFormat (etc) of the old Hadoop API.
038     * See {@link MultipleOutputs} documentation for more information.
039     */
040    @InterfaceAudience.Public
041    @InterfaceStability.Stable
042    public class LazyOutputFormat <K,V> extends FilterOutputFormat<K, V> {
043      public static String OUTPUT_FORMAT = 
044        "mapreduce.output.lazyoutputformat.outputformat";
045      /**
046       * Set the underlying output format for LazyOutputFormat.
047       * @param job the {@link Job} to modify
048       * @param theClass the underlying class
049       */
050      @SuppressWarnings("unchecked")
051      public static void  setOutputFormatClass(Job job, 
052                                         Class<? extends OutputFormat> theClass) {
053          job.setOutputFormatClass(LazyOutputFormat.class);
054          job.getConfiguration().setClass(OUTPUT_FORMAT, 
055              theClass, OutputFormat.class);
056      }
057    
058      @SuppressWarnings("unchecked")
059      private void getBaseOutputFormat(Configuration conf) 
060      throws IOException {
061        baseOut =  ((OutputFormat<K, V>) ReflectionUtils.newInstance(
062          conf.getClass(OUTPUT_FORMAT, null), conf));
063        if (baseOut == null) {
064          throw new IOException("Output Format not set for LazyOutputFormat");
065        }
066      }
067    
068      @Override
069      public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
070      throws IOException, InterruptedException {
071        if (baseOut == null) {
072          getBaseOutputFormat(context.getConfiguration());
073        }
074        return new LazyRecordWriter<K, V>(baseOut, context);
075      }
076      
077      @Override
078      public void checkOutputSpecs(JobContext context) 
079      throws IOException, InterruptedException {
080        if (baseOut == null) {
081          getBaseOutputFormat(context.getConfiguration());
082        }
083       super.checkOutputSpecs(context);
084      }
085      
086      @Override
087      public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
088      throws IOException, InterruptedException {
089        if (baseOut == null) {
090          getBaseOutputFormat(context.getConfiguration());
091        }
092        return super.getOutputCommitter(context);
093      }
094      
095      /**
096       * A convenience class to be used with LazyOutputFormat
097       */
098      private static class LazyRecordWriter<K,V> extends FilterRecordWriter<K,V> {
099    
100        final OutputFormat<K,V> outputFormat;
101        final TaskAttemptContext taskContext;
102    
103        public LazyRecordWriter(OutputFormat<K,V> out, 
104                                TaskAttemptContext taskContext)
105        throws IOException, InterruptedException {
106          this.outputFormat = out;
107          this.taskContext = taskContext;
108        }
109    
110        @Override
111        public void write(K key, V value) throws IOException, InterruptedException {
112          if (rawWriter == null) {
113            rawWriter = outputFormat.getRecordWriter(taskContext);
114          }
115          rawWriter.write(key, value);
116        }
117    
118        @Override
119        public void close(TaskAttemptContext context) 
120        throws IOException, InterruptedException {
121          if (rawWriter != null) {
122            rawWriter.close(context);
123          }
124        }
125    
126      }
127    }