001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.join;
020
021import java.io.IOException;
022import java.util.PriorityQueue;
023
024import org.apache.hadoop.classification.InterfaceAudience;
025import org.apache.hadoop.classification.InterfaceStability;
026import org.apache.hadoop.io.Writable;
027import org.apache.hadoop.io.WritableComparable;
028import org.apache.hadoop.io.WritableComparator;
029import org.apache.hadoop.io.WritableUtils;
030import org.apache.hadoop.mapred.JobConf;
031
032/**
033 * Base class for Composite joins returning Tuples of arbitrary Writables.
034 */
035@InterfaceAudience.Public
036@InterfaceStability.Stable
037public abstract class JoinRecordReader<K extends WritableComparable>
038    extends CompositeRecordReader<K,Writable,TupleWritable>
039    implements ComposableRecordReader<K,TupleWritable> {
040
041  public JoinRecordReader(int id, JobConf conf, int capacity,
042      Class<? extends WritableComparator> cmpcl) throws IOException {
043    super(id, capacity, cmpcl);
044    setConf(conf);
045  }
046
047  /**
048   * Emit the next set of key, value pairs as defined by the child
049   * RecordReaders and operation associated with this composite RR.
050   */
051  public boolean next(K key, TupleWritable value) throws IOException {
052    if (jc.flush(value)) {
053      WritableUtils.cloneInto(key, jc.key());
054      return true;
055    }
056    jc.clear();
057    K iterkey = createKey();
058    final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
059    while (!q.isEmpty()) {
060      fillJoinCollector(iterkey);
061      jc.reset(iterkey);
062      if (jc.flush(value)) {
063        WritableUtils.cloneInto(key, jc.key());
064        return true;
065      }
066      jc.clear();
067    }
068    return false;
069  }
070
071  /** {@inheritDoc} */
072  public TupleWritable createValue() {
073    return createInternalValue();
074  }
075
076  /**
077   * Return an iterator wrapping the JoinCollector.
078   */
079  protected ResetableIterator<TupleWritable> getDelegate() {
080    return new JoinDelegationIterator();
081  }
082
083  /**
084   * Since the JoinCollector is effecting our operation, we need only
085   * provide an iterator proxy wrapping its operation.
086   */
087  protected class JoinDelegationIterator
088      implements ResetableIterator<TupleWritable> {
089
090    public boolean hasNext() {
091      return jc.hasNext();
092    }
093
094    public boolean next(TupleWritable val) throws IOException {
095      return jc.flush(val);
096    }
097
098    public boolean replay(TupleWritable val) throws IOException {
099      return jc.replay(val);
100    }
101
102    public void reset() {
103      jc.reset(jc.key());
104    }
105
106    public void add(TupleWritable item) throws IOException {
107      throw new UnsupportedOperationException();
108    }
109
110    public void close() throws IOException {
111      jc.close();
112    }
113
114    public void clear() {
115      jc.clear();
116    }
117  }
118}