001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.lib;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.mapred.JobConf;
027import org.apache.hadoop.mapred.OutputFormat;
028import org.apache.hadoop.mapred.RecordWriter;
029import org.apache.hadoop.mapred.Reporter;
030import org.apache.hadoop.util.Progressable;
031import org.apache.hadoop.util.ReflectionUtils;
032
033/**
034 * A Convenience class that creates output lazily. 
035 */
036@InterfaceAudience.Public
037@InterfaceStability.Stable
038public class LazyOutputFormat<K, V> extends FilterOutputFormat<K, V> {
039  /**
040   * Set the underlying output format for LazyOutputFormat.
041   * @param job the {@link JobConf} to modify
042   * @param theClass the underlying class
043   */
044  @SuppressWarnings("unchecked")
045  public static void  setOutputFormatClass(JobConf job, 
046      Class<? extends OutputFormat> theClass) {
047      job.setOutputFormat(LazyOutputFormat.class);
048      job.setClass("mapreduce.output.lazyoutputformat.outputformat", theClass, OutputFormat.class);
049  }
050
051  @Override
052  public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, 
053      String name, Progressable progress) throws IOException {
054    if (baseOut == null) {
055      getBaseOutputFormat(job);
056    }
057    return new LazyRecordWriter<K, V>(job, baseOut, name, progress);
058  }
059
060  @Override
061  public void checkOutputSpecs(FileSystem ignored, JobConf job) 
062  throws IOException {
063    if (baseOut == null) {
064      getBaseOutputFormat(job);
065    }
066    super.checkOutputSpecs(ignored, job);
067  }
068
069  @SuppressWarnings("unchecked")
070  private void getBaseOutputFormat(JobConf job) throws IOException {
071    baseOut = ReflectionUtils.newInstance(
072        job.getClass("mapreduce.output.lazyoutputformat.outputformat", null, OutputFormat.class), 
073        job); 
074    if (baseOut == null) {
075      throw new IOException("Ouput format not set for LazyOutputFormat");
076    }
077  }
078  
079  /**
080   * <code>LazyRecordWriter</code> is a convenience 
081   * class that works with LazyOutputFormat.
082   */
083
084  private static class LazyRecordWriter<K,V> extends FilterRecordWriter<K,V> {
085
086    final OutputFormat of;
087    final String name;
088    final Progressable progress;
089    final JobConf job;
090
091    public LazyRecordWriter(JobConf job, OutputFormat of, String name,
092        Progressable progress)  throws IOException {
093      this.of = of;
094      this.job = job;
095      this.name = name;
096      this.progress = progress;
097    }
098
099    @Override
100    public void close(Reporter reporter) throws IOException {
101      if (rawWriter != null) {
102        rawWriter.close(reporter);
103      }
104    }
105
106    @Override
107    public void write(K key, V value) throws IOException {
108      if (rawWriter == null) {
109        createRecordWriter();
110      }
111      super.write(key, value);
112    }
113
114    @SuppressWarnings("unchecked")
115    private void createRecordWriter() throws IOException {
116      FileSystem fs = FileSystem.get(job);
117      rawWriter = of.getRecordWriter(fs, job, name, progress);
118    }  
119  }
120}