001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.join;
020    
021    import java.io.IOException;
022    import java.util.PriorityQueue;
023    
024    import org.apache.hadoop.classification.InterfaceAudience;
025    import org.apache.hadoop.classification.InterfaceStability;
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.hadoop.io.Writable;
028    import org.apache.hadoop.io.WritableComparable;
029    import org.apache.hadoop.io.WritableComparator;
030    import org.apache.hadoop.mapreduce.InputSplit;
031    import org.apache.hadoop.mapreduce.TaskAttemptContext;
032    import org.apache.hadoop.util.ReflectionUtils;
033    
034    /**
035     * Base class for Composite join returning values derived from multiple
036     * sources, but generally not tuples.
037     */
038    @InterfaceAudience.Public
039    @InterfaceStability.Stable
040    public abstract class MultiFilterRecordReader<K extends WritableComparable<?>,
041                                                  V extends Writable>
042        extends CompositeRecordReader<K,V,V> {
043    
044      private TupleWritable ivalue = null;
045    
046      public MultiFilterRecordReader(int id, Configuration conf, int capacity,
047          Class<? extends WritableComparator> cmpcl) throws IOException {
048        super(id, capacity, cmpcl);
049        setConf(conf);
050      }
051    
052      /**
053       * For each tuple emitted, return a value (typically one of the values
054       * in the tuple).
055       * Modifying the Writables in the tuple is permitted and unlikely to affect
056       * join behavior in most cases, but it is not recommended. It's safer to
057       * clone first.
058       */
059      protected abstract V emit(TupleWritable dst) throws IOException;
060    
061      /**
062       * Default implementation offers {@link #emit} every Tuple from the
063       * collector (the outer join of child RRs).
064       */
065      protected boolean combine(Object[] srcs, TupleWritable dst) {
066        return true;
067      }
068    
069      /** {@inheritDoc} */
070      public boolean nextKeyValue() throws IOException, InterruptedException {
071        if (key == null) {
072          key = createKey();
073        }
074        if (value == null) {
075          value = createValue();
076        }
077        if (jc.flush(ivalue)) {
078          ReflectionUtils.copy(conf, jc.key(), key);
079          ReflectionUtils.copy(conf, emit(ivalue), value);
080          return true;
081        }
082        if (ivalue == null) {
083          ivalue = createTupleWritable();
084        }
085        jc.clear();
086        final PriorityQueue<ComposableRecordReader<K,?>> q = 
087                getRecordReaderQueue();
088        K iterkey = createKey();
089        while (q != null && !q.isEmpty()) {
090          fillJoinCollector(iterkey);
091          jc.reset(iterkey);
092          if (jc.flush(ivalue)) {
093            ReflectionUtils.copy(conf, jc.key(), key);
094            ReflectionUtils.copy(conf, emit(ivalue), value);
095            return true;
096          }
097          jc.clear();
098        }
099        return false;
100      }
101    
102      @SuppressWarnings("unchecked")
103      public void initialize(InputSplit split, TaskAttemptContext context) 
104          throws IOException, InterruptedException {
105        super.initialize(split, context);
106      }
107    
108      /**
109       * Return an iterator returning a single value from the tuple.
110       * @see MultiFilterDelegationIterator
111       */
112      protected ResetableIterator<V> getDelegate() {
113        return new MultiFilterDelegationIterator();
114      }
115    
116      /**
117       * Proxy the JoinCollector, but include callback to emit.
118       */
119      protected class MultiFilterDelegationIterator
120          implements ResetableIterator<V> {
121    
122        public boolean hasNext() {
123          return jc.hasNext();
124        }
125    
126        public boolean next(V val) throws IOException {
127          boolean ret;
128          if (ret = jc.flush(ivalue)) {
129            ReflectionUtils.copy(getConf(), emit(ivalue), val);
130          }
131          return ret;
132        }
133    
134        public boolean replay(V val) throws IOException {
135          ReflectionUtils.copy(getConf(), emit(ivalue), val);
136          return true;
137        }
138    
139        public void reset() {
140          jc.reset(jc.key());
141        }
142    
143        public void add(V item) throws IOException {
144          throw new UnsupportedOperationException();
145        }
146    
147        public void close() throws IOException {
148          jc.close();
149        }
150    
151        public void clear() {
152          jc.clear();
153        }
154      }
155    
156    }