001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.join; 020 021import java.io.IOException; 022import java.util.PriorityQueue; 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.io.Writable; 028import org.apache.hadoop.io.WritableComparable; 029import org.apache.hadoop.io.WritableComparator; 030import org.apache.hadoop.mapreduce.InputSplit; 031import org.apache.hadoop.mapreduce.TaskAttemptContext; 032import org.apache.hadoop.util.ReflectionUtils; 033 034/** 035 * Base class for Composite join returning values derived from multiple 036 * sources, but generally not tuples. 037 */ 038@InterfaceAudience.Public 039@InterfaceStability.Stable 040public abstract class MultiFilterRecordReader<K extends WritableComparable<?>, 041 V extends Writable> 042 extends CompositeRecordReader<K,V,V> { 043 044 private TupleWritable ivalue = null; 045 046 public MultiFilterRecordReader(int id, Configuration conf, int capacity, 047 Class<? extends WritableComparator> cmpcl) throws IOException { 048 super(id, capacity, cmpcl); 049 setConf(conf); 050 } 051 052 /** 053 * For each tuple emitted, return a value (typically one of the values 054 * in the tuple). 055 * Modifying the Writables in the tuple is permitted and unlikely to affect 056 * join behavior in most cases, but it is not recommended. It's safer to 057 * clone first. 058 */ 059 protected abstract V emit(TupleWritable dst) throws IOException; 060 061 /** 062 * Default implementation offers {@link #emit} every Tuple from the 063 * collector (the outer join of child RRs). 064 */ 065 protected boolean combine(Object[] srcs, TupleWritable dst) { 066 return true; 067 } 068 069 /** {@inheritDoc} */ 070 public boolean nextKeyValue() throws IOException, InterruptedException { 071 if (key == null) { 072 key = createKey(); 073 } 074 if (value == null) { 075 value = createValue(); 076 } 077 if (jc.flush(ivalue)) { 078 ReflectionUtils.copy(conf, jc.key(), key); 079 ReflectionUtils.copy(conf, emit(ivalue), value); 080 return true; 081 } 082 if (ivalue == null) { 083 ivalue = createTupleWritable(); 084 } 085 jc.clear(); 086 final PriorityQueue<ComposableRecordReader<K,?>> q = 087 getRecordReaderQueue(); 088 K iterkey = createKey(); 089 while (q != null && !q.isEmpty()) { 090 fillJoinCollector(iterkey); 091 jc.reset(iterkey); 092 if (jc.flush(ivalue)) { 093 ReflectionUtils.copy(conf, jc.key(), key); 094 ReflectionUtils.copy(conf, emit(ivalue), value); 095 return true; 096 } 097 jc.clear(); 098 } 099 return false; 100 } 101 102 @SuppressWarnings("unchecked") 103 public void initialize(InputSplit split, TaskAttemptContext context) 104 throws IOException, InterruptedException { 105 super.initialize(split, context); 106 } 107 108 /** 109 * Return an iterator returning a single value from the tuple. 110 * @see MultiFilterDelegationIterator 111 */ 112 protected ResetableIterator<V> getDelegate() { 113 return new MultiFilterDelegationIterator(); 114 } 115 116 /** 117 * Proxy the JoinCollector, but include callback to emit. 118 */ 119 protected class MultiFilterDelegationIterator 120 implements ResetableIterator<V> { 121 122 public boolean hasNext() { 123 return jc.hasNext(); 124 } 125 126 public boolean next(V val) throws IOException { 127 boolean ret; 128 if (ret = jc.flush(ivalue)) { 129 ReflectionUtils.copy(getConf(), emit(ivalue), val); 130 } 131 return ret; 132 } 133 134 public boolean replay(V val) throws IOException { 135 ReflectionUtils.copy(getConf(), emit(ivalue), val); 136 return true; 137 } 138 139 public void reset() { 140 jc.reset(jc.key()); 141 } 142 143 public void add(V item) throws IOException { 144 throw new UnsupportedOperationException(); 145 } 146 147 public void close() throws IOException { 148 jc.close(); 149 } 150 151 public void clear() { 152 jc.clear(); 153 } 154 } 155 156}