001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.join;
020    
021    import java.io.IOException;
022    import java.util.PriorityQueue;
023    
024    import org.apache.hadoop.classification.InterfaceAudience;
025    import org.apache.hadoop.classification.InterfaceStability;
026    import org.apache.hadoop.io.Writable;
027    import org.apache.hadoop.io.WritableComparable;
028    import org.apache.hadoop.io.WritableComparator;
029    import org.apache.hadoop.io.WritableUtils;
030    import org.apache.hadoop.util.ReflectionUtils;
031    import org.apache.hadoop.mapred.JobConf;
032    import org.apache.hadoop.mapred.RecordReader;
033    
034    /**
035     * Base class for Composite join returning values derived from multiple
036     * sources, but generally not tuples.
037     */
038    @InterfaceAudience.Public
039    @InterfaceStability.Stable
040    public abstract class MultiFilterRecordReader<K extends WritableComparable,
041                                                  V extends Writable>
042        extends CompositeRecordReader<K,V,V>
043        implements ComposableRecordReader<K,V> {
044    
045      private Class<? extends Writable> valueclass;
046      private TupleWritable ivalue;
047    
048      public MultiFilterRecordReader(int id, JobConf conf, int capacity,
049          Class<? extends WritableComparator> cmpcl) throws IOException {
050        super(id, capacity, cmpcl);
051        setConf(conf);
052      }
053    
054      /**
055       * For each tuple emitted, return a value (typically one of the values
056       * in the tuple).
057       * Modifying the Writables in the tuple is permitted and unlikely to affect
058       * join behavior in most cases, but it is not recommended. It's safer to
059       * clone first.
060       */
061      protected abstract V emit(TupleWritable dst) throws IOException;
062    
063      /**
064       * Default implementation offers {@link #emit} every Tuple from the
065       * collector (the outer join of child RRs).
066       */
067      protected boolean combine(Object[] srcs, TupleWritable dst) {
068        return true;
069      }
070    
071      /** {@inheritDoc} */
072      public boolean next(K key, V value) throws IOException {
073        if (jc.flush(ivalue)) {
074          WritableUtils.cloneInto(key, jc.key());
075          WritableUtils.cloneInto(value, emit(ivalue));
076          return true;
077        }
078        jc.clear();
079        K iterkey = createKey();
080        final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
081        while (!q.isEmpty()) {
082          fillJoinCollector(iterkey);
083          jc.reset(iterkey);
084          if (jc.flush(ivalue)) {
085            WritableUtils.cloneInto(key, jc.key());
086            WritableUtils.cloneInto(value, emit(ivalue));
087            return true;
088          }
089          jc.clear();
090        }
091        return false;
092      }
093    
094      /** {@inheritDoc} */
095      @SuppressWarnings("unchecked") // Explicit check for value class agreement
096      public V createValue() {
097        if (null == valueclass) {
098          final Class<?> cls = kids[0].createValue().getClass();
099          for (RecordReader<K,? extends V> rr : kids) {
100            if (!cls.equals(rr.createValue().getClass())) {
101              throw new ClassCastException("Child value classes fail to agree");
102            }
103          }
104          valueclass = cls.asSubclass(Writable.class);
105          ivalue = createInternalValue();
106        }
107        return (V) ReflectionUtils.newInstance(valueclass, null);
108      }
109    
110      /**
111       * Return an iterator returning a single value from the tuple.
112       * @see MultiFilterDelegationIterator
113       */
114      protected ResetableIterator<V> getDelegate() {
115        return new MultiFilterDelegationIterator();
116      }
117    
118      /**
119       * Proxy the JoinCollector, but include callback to emit.
120       */
121      protected class MultiFilterDelegationIterator
122          implements ResetableIterator<V> {
123    
124        public boolean hasNext() {
125          return jc.hasNext();
126        }
127    
128        public boolean next(V val) throws IOException {
129          boolean ret;
130          if (ret = jc.flush(ivalue)) {
131            WritableUtils.cloneInto(val, emit(ivalue));
132          }
133          return ret;
134        }
135    
136        public boolean replay(V val) throws IOException {
137          WritableUtils.cloneInto(val, emit(ivalue));
138          return true;
139        }
140    
141        public void reset() {
142          jc.reset(jc.key());
143        }
144    
145        public void add(V item) throws IOException {
146          throw new UnsupportedOperationException();
147        }
148    
149        public void close() throws IOException {
150          jc.close();
151        }
152    
153        public void clear() {
154          jc.clear();
155        }
156      }
157    
158    }