001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.output;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.mapreduce.JobContext;
026    import org.apache.hadoop.mapreduce.OutputCommitter;
027    import org.apache.hadoop.mapreduce.OutputFormat;
028    import org.apache.hadoop.mapreduce.RecordWriter;
029    import org.apache.hadoop.mapreduce.TaskAttemptContext;
030    
031    /**
032     * FilterOutputFormat is a convenience class that wraps OutputFormat. 
033     */
034    @InterfaceAudience.Public
035    @InterfaceStability.Stable
036    public class FilterOutputFormat <K,V> extends OutputFormat<K, V> {
037    
038      protected OutputFormat<K,V> baseOut;
039    
040      public FilterOutputFormat() {
041        this.baseOut = null;
042      }
043      
044      /**
045       * Create a FilterOutputFormat based on the underlying output format.
046       * @param baseOut the underlying OutputFormat
047       */
048      public FilterOutputFormat(OutputFormat<K,V> baseOut) {
049        this.baseOut = baseOut;
050      }
051    
052      @Override
053      public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) 
054      throws IOException, InterruptedException {
055        return getBaseOut().getRecordWriter(context);
056      }
057    
058      @Override
059      public void checkOutputSpecs(JobContext context) 
060      throws IOException, InterruptedException {
061        getBaseOut().checkOutputSpecs(context);
062      }
063    
064      @Override
065      public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
066      throws IOException, InterruptedException {
067        return getBaseOut().getOutputCommitter(context);
068      }
069    
070      private OutputFormat<K,V> getBaseOut() throws IOException {
071        if (baseOut == null) {
072          throw new IOException("OutputFormat not set for FilterOutputFormat");
073        }
074        return baseOut;
075      }
076      /**
077       * <code>FilterRecordWriter</code> is a convenience wrapper
078       * class that extends the {@link RecordWriter}.
079       */
080    
081      public static class FilterRecordWriter<K,V> extends RecordWriter<K,V> {
082    
083        protected RecordWriter<K,V> rawWriter = null;
084    
085        public FilterRecordWriter() {
086          rawWriter = null;
087        }
088        
089        public FilterRecordWriter(RecordWriter<K,V> rwriter) {
090          this.rawWriter = rwriter;
091        }
092        
093        @Override
094        public void write(K key, V value) throws IOException, InterruptedException {
095          getRawWriter().write(key, value);
096        }
097    
098        @Override
099        public void close(TaskAttemptContext context) 
100        throws IOException, InterruptedException {
101          getRawWriter().close(context);
102        }
103        
104        private RecordWriter<K,V> getRawWriter() throws IOException {
105          if (rawWriter == null) {
106            throw new IOException("Record Writer not set for FilterRecordWriter");
107          }
108          return rawWriter;
109        }
110      }
111    }