001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.join;
020    
021    import java.io.IOException;
022    import java.util.PriorityQueue;
023    
024    import org.apache.hadoop.classification.InterfaceAudience;
025    import org.apache.hadoop.classification.InterfaceStability;
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.hadoop.io.Writable;
028    import org.apache.hadoop.io.WritableComparable;
029    import org.apache.hadoop.io.WritableComparator;
030    import org.apache.hadoop.util.ReflectionUtils;
031    
032    /**
033     * Base class for Composite joins returning Tuples of arbitrary Writables.
034     */
035    @InterfaceAudience.Public
036    @InterfaceStability.Stable
037    public abstract class JoinRecordReader<K extends WritableComparable<?>>
038        extends CompositeRecordReader<K,Writable,TupleWritable> {
039    
040      public JoinRecordReader(int id, Configuration conf, int capacity,
041          Class<? extends WritableComparator> cmpcl) throws IOException {
042        super(id, capacity, cmpcl);
043        setConf(conf);
044      }
045    
046      /**
047       * Emit the next set of key, value pairs as defined by the child
048       * RecordReaders and operation associated with this composite RR.
049       */
050      public boolean nextKeyValue() 
051          throws IOException, InterruptedException {
052        if (key == null) {
053          key = createKey();
054        }
055        if (jc.flush(value)) {
056          ReflectionUtils.copy(conf, jc.key(), key);
057          return true;
058        }
059        jc.clear();
060        if (value == null) {
061          value = createValue();
062        }
063        final PriorityQueue<ComposableRecordReader<K,?>> q = 
064                getRecordReaderQueue();
065        K iterkey = createKey();
066        while (q != null && !q.isEmpty()) {
067          fillJoinCollector(iterkey);
068          jc.reset(iterkey);
069          if (jc.flush(value)) {
070            ReflectionUtils.copy(conf, jc.key(), key);
071            return true;
072          }
073          jc.clear();
074        }
075        return false;
076      }
077    
078      public TupleWritable createValue() {
079        return createTupleWritable();
080      }
081    
082      /**
083       * Return an iterator wrapping the JoinCollector.
084       */
085      protected ResetableIterator<TupleWritable> getDelegate() {
086        return new JoinDelegationIterator();
087      }
088    
089      /**
090       * Since the JoinCollector is effecting our operation, we need only
091       * provide an iterator proxy wrapping its operation.
092       */
093      protected class JoinDelegationIterator
094          implements ResetableIterator<TupleWritable> {
095    
096        public boolean hasNext() {
097          return jc.hasNext();
098        }
099    
100        public boolean next(TupleWritable val) throws IOException {
101          return jc.flush(val);
102        }
103    
104        public boolean replay(TupleWritable val) throws IOException {
105          return jc.replay(val);
106        }
107    
108        public void reset() {
109          jc.reset(jc.key());
110        }
111    
112        public void add(TupleWritable item) throws IOException {
113          throw new UnsupportedOperationException();
114        }
115    
116        public void close() throws IOException {
117          jc.close();
118        }
119    
120        public void clear() {
121          jc.clear();
122        }
123      }
124    }