001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.join;
020
021import java.io.IOException;
022import java.util.PriorityQueue;
023
024import org.apache.hadoop.classification.InterfaceAudience;
025import org.apache.hadoop.classification.InterfaceStability;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.io.Writable;
028import org.apache.hadoop.io.WritableComparable;
029import org.apache.hadoop.io.WritableComparator;
030import org.apache.hadoop.util.ReflectionUtils;
031
032/**
033 * Base class for Composite joins returning Tuples of arbitrary Writables.
034 */
035@InterfaceAudience.Public
036@InterfaceStability.Stable
037public abstract class JoinRecordReader<K extends WritableComparable<?>>
038    extends CompositeRecordReader<K,Writable,TupleWritable> {
039
040  public JoinRecordReader(int id, Configuration conf, int capacity,
041      Class<? extends WritableComparator> cmpcl) throws IOException {
042    super(id, capacity, cmpcl);
043    setConf(conf);
044  }
045
046  /**
047   * Emit the next set of key, value pairs as defined by the child
048   * RecordReaders and operation associated with this composite RR.
049   */
050  public boolean nextKeyValue() 
051      throws IOException, InterruptedException {
052    if (key == null) {
053      key = createKey();
054    }
055    if (jc.flush(value)) {
056      ReflectionUtils.copy(conf, jc.key(), key);
057      return true;
058    }
059    jc.clear();
060    if (value == null) {
061      value = createValue();
062    }
063    final PriorityQueue<ComposableRecordReader<K,?>> q = 
064            getRecordReaderQueue();
065    K iterkey = createKey();
066    while (q != null && !q.isEmpty()) {
067      fillJoinCollector(iterkey);
068      jc.reset(iterkey);
069      if (jc.flush(value)) {
070        ReflectionUtils.copy(conf, jc.key(), key);
071        return true;
072      }
073      jc.clear();
074    }
075    return false;
076  }
077
078  public TupleWritable createValue() {
079    return createTupleWritable();
080  }
081
082  /**
083   * Return an iterator wrapping the JoinCollector.
084   */
085  protected ResetableIterator<TupleWritable> getDelegate() {
086    return new JoinDelegationIterator();
087  }
088
089  /**
090   * Since the JoinCollector is effecting our operation, we need only
091   * provide an iterator proxy wrapping its operation.
092   */
093  protected class JoinDelegationIterator
094      implements ResetableIterator<TupleWritable> {
095
096    public boolean hasNext() {
097      return jc.hasNext();
098    }
099
100    public boolean next(TupleWritable val) throws IOException {
101      return jc.flush(val);
102    }
103
104    public boolean replay(TupleWritable val) throws IOException {
105      return jc.replay(val);
106    }
107
108    public void reset() {
109      jc.reset(jc.key());
110    }
111
112    public void add(TupleWritable item) throws IOException {
113      throw new UnsupportedOperationException();
114    }
115
116    public void close() throws IOException {
117      jc.close();
118    }
119
120    public void clear() {
121      jc.clear();
122    }
123  }
124}