001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.join;
020
021import java.io.IOException;
022import java.util.PriorityQueue;
023
024import org.apache.hadoop.classification.InterfaceAudience;
025import org.apache.hadoop.classification.InterfaceStability;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.io.Writable;
028import org.apache.hadoop.io.WritableComparable;
029import org.apache.hadoop.io.WritableComparator;
030import org.apache.hadoop.mapreduce.InputSplit;
031import org.apache.hadoop.mapreduce.TaskAttemptContext;
032import org.apache.hadoop.util.ReflectionUtils;
033
034/**
035 * Base class for Composite join returning values derived from multiple
036 * sources, but generally not tuples.
037 */
038@InterfaceAudience.Public
039@InterfaceStability.Stable
040public abstract class MultiFilterRecordReader<K extends WritableComparable<?>,
041                                              V extends Writable>
042    extends CompositeRecordReader<K,V,V> {
043
044  private TupleWritable ivalue = null;
045
046  public MultiFilterRecordReader(int id, Configuration conf, int capacity,
047      Class<? extends WritableComparator> cmpcl) throws IOException {
048    super(id, capacity, cmpcl);
049    setConf(conf);
050  }
051
052  /**
053   * For each tuple emitted, return a value (typically one of the values
054   * in the tuple).
055   * Modifying the Writables in the tuple is permitted and unlikely to affect
056   * join behavior in most cases, but it is not recommended. It's safer to
057   * clone first.
058   */
059  protected abstract V emit(TupleWritable dst) throws IOException;
060
061  /**
062   * Default implementation offers {@link #emit} every Tuple from the
063   * collector (the outer join of child RRs).
064   */
065  protected boolean combine(Object[] srcs, TupleWritable dst) {
066    return true;
067  }
068
069  /** {@inheritDoc} */
070  public boolean nextKeyValue() throws IOException, InterruptedException {
071    if (key == null) {
072      key = createKey();
073    }
074    if (value == null) {
075      value = createValue();
076    }
077    if (jc.flush(ivalue)) {
078      ReflectionUtils.copy(conf, jc.key(), key);
079      ReflectionUtils.copy(conf, emit(ivalue), value);
080      return true;
081    }
082    if (ivalue == null) {
083      ivalue = createTupleWritable();
084    }
085    jc.clear();
086    final PriorityQueue<ComposableRecordReader<K,?>> q = 
087            getRecordReaderQueue();
088    K iterkey = createKey();
089    while (q != null && !q.isEmpty()) {
090      fillJoinCollector(iterkey);
091      jc.reset(iterkey);
092      if (jc.flush(ivalue)) {
093        ReflectionUtils.copy(conf, jc.key(), key);
094        ReflectionUtils.copy(conf, emit(ivalue), value);
095        return true;
096      }
097      jc.clear();
098    }
099    return false;
100  }
101
102  @SuppressWarnings("unchecked")
103  public void initialize(InputSplit split, TaskAttemptContext context) 
104      throws IOException, InterruptedException {
105    super.initialize(split, context);
106  }
107
108  /**
109   * Return an iterator returning a single value from the tuple.
110   * @see MultiFilterDelegationIterator
111   */
112  protected ResetableIterator<V> getDelegate() {
113    return new MultiFilterDelegationIterator();
114  }
115
116  /**
117   * Proxy the JoinCollector, but include callback to emit.
118   */
119  protected class MultiFilterDelegationIterator
120      implements ResetableIterator<V> {
121
122    public boolean hasNext() {
123      return jc.hasNext();
124    }
125
126    public boolean next(V val) throws IOException {
127      boolean ret;
128      if (ret = jc.flush(ivalue)) {
129        ReflectionUtils.copy(getConf(), emit(ivalue), val);
130      }
131      return ret;
132    }
133
134    public boolean replay(V val) throws IOException {
135      ReflectionUtils.copy(getConf(), emit(ivalue), val);
136      return true;
137    }
138
139    public void reset() {
140      jc.reset(jc.key());
141    }
142
143    public void add(V item) throws IOException {
144      throw new UnsupportedOperationException();
145    }
146
147    public void close() throws IOException {
148      jc.close();
149    }
150
151    public void clear() {
152      jc.clear();
153    }
154  }
155
156}