001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.join;
020
021import java.io.IOException;
022import java.util.PriorityQueue;
023
024import org.apache.hadoop.classification.InterfaceAudience;
025import org.apache.hadoop.classification.InterfaceStability;
026import org.apache.hadoop.io.Writable;
027import org.apache.hadoop.io.WritableComparable;
028import org.apache.hadoop.io.WritableComparator;
029import org.apache.hadoop.io.WritableUtils;
030import org.apache.hadoop.util.ReflectionUtils;
031import org.apache.hadoop.mapred.JobConf;
032import org.apache.hadoop.mapred.RecordReader;
033
034/**
035 * Base class for Composite join returning values derived from multiple
036 * sources, but generally not tuples.
037 */
038@InterfaceAudience.Public
039@InterfaceStability.Stable
040public abstract class MultiFilterRecordReader<K extends WritableComparable,
041                                              V extends Writable>
042    extends CompositeRecordReader<K,V,V>
043    implements ComposableRecordReader<K,V> {
044
045  private Class<? extends Writable> valueclass;
046  private TupleWritable ivalue;
047
048  public MultiFilterRecordReader(int id, JobConf conf, int capacity,
049      Class<? extends WritableComparator> cmpcl) throws IOException {
050    super(id, capacity, cmpcl);
051    setConf(conf);
052  }
053
054  /**
055   * For each tuple emitted, return a value (typically one of the values
056   * in the tuple).
057   * Modifying the Writables in the tuple is permitted and unlikely to affect
058   * join behavior in most cases, but it is not recommended. It's safer to
059   * clone first.
060   */
061  protected abstract V emit(TupleWritable dst) throws IOException;
062
063  /**
064   * Default implementation offers {@link #emit} every Tuple from the
065   * collector (the outer join of child RRs).
066   */
067  protected boolean combine(Object[] srcs, TupleWritable dst) {
068    return true;
069  }
070
071  /** {@inheritDoc} */
072  public boolean next(K key, V value) throws IOException {
073    if (jc.flush(ivalue)) {
074      WritableUtils.cloneInto(key, jc.key());
075      WritableUtils.cloneInto(value, emit(ivalue));
076      return true;
077    }
078    jc.clear();
079    K iterkey = createKey();
080    final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
081    while (!q.isEmpty()) {
082      fillJoinCollector(iterkey);
083      jc.reset(iterkey);
084      if (jc.flush(ivalue)) {
085        WritableUtils.cloneInto(key, jc.key());
086        WritableUtils.cloneInto(value, emit(ivalue));
087        return true;
088      }
089      jc.clear();
090    }
091    return false;
092  }
093
094  /** {@inheritDoc} */
095  @SuppressWarnings("unchecked") // Explicit check for value class agreement
096  public V createValue() {
097    if (null == valueclass) {
098      final Class<?> cls = kids[0].createValue().getClass();
099      for (RecordReader<K,? extends V> rr : kids) {
100        if (!cls.equals(rr.createValue().getClass())) {
101          throw new ClassCastException("Child value classes fail to agree");
102        }
103      }
104      valueclass = cls.asSubclass(Writable.class);
105      ivalue = createInternalValue();
106    }
107    return (V) ReflectionUtils.newInstance(valueclass, null);
108  }
109
110  /**
111   * Return an iterator returning a single value from the tuple.
112   * @see MultiFilterDelegationIterator
113   */
114  protected ResetableIterator<V> getDelegate() {
115    return new MultiFilterDelegationIterator();
116  }
117
118  /**
119   * Proxy the JoinCollector, but include callback to emit.
120   */
121  protected class MultiFilterDelegationIterator
122      implements ResetableIterator<V> {
123
124    public boolean hasNext() {
125      return jc.hasNext();
126    }
127
128    public boolean next(V val) throws IOException {
129      boolean ret;
130      if (ret = jc.flush(ivalue)) {
131        WritableUtils.cloneInto(val, emit(ivalue));
132      }
133      return ret;
134    }
135
136    public boolean replay(V val) throws IOException {
137      WritableUtils.cloneInto(val, emit(ivalue));
138      return true;
139    }
140
141    public void reset() {
142      jc.reset(jc.key());
143    }
144
145    public void add(V item) throws IOException {
146      throw new UnsupportedOperationException();
147    }
148
149    public void close() throws IOException {
150      jc.close();
151    }
152
153    public void clear() {
154      jc.clear();
155    }
156  }
157
158}