001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred.join; 020 021import java.io.IOException; 022import java.util.PriorityQueue; 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.io.Writable; 027import org.apache.hadoop.io.WritableComparable; 028import org.apache.hadoop.io.WritableComparator; 029import org.apache.hadoop.io.WritableUtils; 030import org.apache.hadoop.mapred.JobConf; 031 032/** 033 * Base class for Composite joins returning Tuples of arbitrary Writables. 034 */ 035@InterfaceAudience.Public 036@InterfaceStability.Stable 037public abstract class JoinRecordReader<K extends WritableComparable> 038 extends CompositeRecordReader<K,Writable,TupleWritable> 039 implements ComposableRecordReader<K,TupleWritable> { 040 041 public JoinRecordReader(int id, JobConf conf, int capacity, 042 Class<? extends WritableComparator> cmpcl) throws IOException { 043 super(id, capacity, cmpcl); 044 setConf(conf); 045 } 046 047 /** 048 * Emit the next set of key, value pairs as defined by the child 049 * RecordReaders and operation associated with this composite RR. 050 */ 051 public boolean next(K key, TupleWritable value) throws IOException { 052 if (jc.flush(value)) { 053 WritableUtils.cloneInto(key, jc.key()); 054 return true; 055 } 056 jc.clear(); 057 K iterkey = createKey(); 058 final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue(); 059 while (!q.isEmpty()) { 060 fillJoinCollector(iterkey); 061 jc.reset(iterkey); 062 if (jc.flush(value)) { 063 WritableUtils.cloneInto(key, jc.key()); 064 return true; 065 } 066 jc.clear(); 067 } 068 return false; 069 } 070 071 /** {@inheritDoc} */ 072 public TupleWritable createValue() { 073 return createInternalValue(); 074 } 075 076 /** 077 * Return an iterator wrapping the JoinCollector. 078 */ 079 protected ResetableIterator<TupleWritable> getDelegate() { 080 return new JoinDelegationIterator(); 081 } 082 083 /** 084 * Since the JoinCollector is effecting our operation, we need only 085 * provide an iterator proxy wrapping its operation. 086 */ 087 protected class JoinDelegationIterator 088 implements ResetableIterator<TupleWritable> { 089 090 public boolean hasNext() { 091 return jc.hasNext(); 092 } 093 094 public boolean next(TupleWritable val) throws IOException { 095 return jc.flush(val); 096 } 097 098 public boolean replay(TupleWritable val) throws IOException { 099 return jc.replay(val); 100 } 101 102 public void reset() { 103 jc.reset(jc.key()); 104 } 105 106 public void add(TupleWritable item) throws IOException { 107 throw new UnsupportedOperationException(); 108 } 109 110 public void close() throws IOException { 111 jc.close(); 112 } 113 114 public void clear() { 115 jc.clear(); 116 } 117 } 118}