001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.output;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.mapreduce.JobContext;
026import org.apache.hadoop.mapreduce.OutputCommitter;
027import org.apache.hadoop.mapreduce.OutputFormat;
028import org.apache.hadoop.mapreduce.RecordWriter;
029import org.apache.hadoop.mapreduce.TaskAttemptContext;
030
031/**
032 * FilterOutputFormat is a convenience class that wraps OutputFormat. 
033 */
034@InterfaceAudience.Public
035@InterfaceStability.Stable
036public class FilterOutputFormat <K,V> extends OutputFormat<K, V> {
037
038  protected OutputFormat<K,V> baseOut;
039
040  public FilterOutputFormat() {
041    this.baseOut = null;
042  }
043  
044  /**
045   * Create a FilterOutputFormat based on the underlying output format.
046   * @param baseOut the underlying OutputFormat
047   */
048  public FilterOutputFormat(OutputFormat<K,V> baseOut) {
049    this.baseOut = baseOut;
050  }
051
052  @Override
053  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) 
054  throws IOException, InterruptedException {
055    return getBaseOut().getRecordWriter(context);
056  }
057
058  @Override
059  public void checkOutputSpecs(JobContext context) 
060  throws IOException, InterruptedException {
061    getBaseOut().checkOutputSpecs(context);
062  }
063
064  @Override
065  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
066  throws IOException, InterruptedException {
067    return getBaseOut().getOutputCommitter(context);
068  }
069
070  private OutputFormat<K,V> getBaseOut() throws IOException {
071    if (baseOut == null) {
072      throw new IOException("OutputFormat not set for FilterOutputFormat");
073    }
074    return baseOut;
075  }
076  /**
077   * <code>FilterRecordWriter</code> is a convenience wrapper
078   * class that extends the {@link RecordWriter}.
079   */
080
081  public static class FilterRecordWriter<K,V> extends RecordWriter<K,V> {
082
083    protected RecordWriter<K,V> rawWriter = null;
084
085    public FilterRecordWriter() {
086      rawWriter = null;
087    }
088    
089    public FilterRecordWriter(RecordWriter<K,V> rwriter) {
090      this.rawWriter = rwriter;
091    }
092    
093    @Override
094    public void write(K key, V value) throws IOException, InterruptedException {
095      getRawWriter().write(key, value);
096    }
097
098    @Override
099    public void close(TaskAttemptContext context) 
100    throws IOException, InterruptedException {
101      getRawWriter().close(context);
102    }
103    
104    private RecordWriter<K,V> getRawWriter() throws IOException {
105      if (rawWriter == null) {
106        throw new IOException("Record Writer not set for FilterRecordWriter");
107      }
108      return rawWriter;
109    }
110  }
111}