001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.join; 020 021import java.io.IOException; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.io.NullWritable; 027import org.apache.hadoop.io.Writable; 028import org.apache.hadoop.io.WritableComparable; 029import org.apache.hadoop.io.WritableComparator; 030import org.apache.hadoop.mapreduce.InputSplit; 031import org.apache.hadoop.mapreduce.RecordReader; 032import org.apache.hadoop.mapreduce.TaskAttemptContext; 033import org.apache.hadoop.util.ReflectionUtils; 034 035/** 036 * Proxy class for a RecordReader participating in the join framework. 037 * 038 * This class keeps track of the "head" key-value pair for the 039 * provided RecordReader and keeps a store of values matching a key when 040 * this source is participating in a join. 041 */ 042@InterfaceAudience.Public 043@InterfaceStability.Stable 044public class WrappedRecordReader<K extends WritableComparable<?>, 045 U extends Writable> extends ComposableRecordReader<K,U> { 046 047 protected boolean empty = false; 048 private RecordReader<K,U> rr; 049 private int id; // index at which values will be inserted in collector 050 051 protected WritableComparator cmp = null; 052 private K key; // key at the top of this RR 053 private U value; // value assoc with key 054 private ResetableIterator<U> vjoin; 055 private Configuration conf = new Configuration(); 056 @SuppressWarnings("unchecked") 057 private Class<? extends WritableComparable> keyclass = null; 058 private Class<? extends Writable> valueclass = null; 059 060 protected WrappedRecordReader(int id) { 061 this.id = id; 062 vjoin = new StreamBackedIterator<U>(); 063 } 064 065 /** 066 * For a given RecordReader rr, occupy position id in collector. 067 */ 068 WrappedRecordReader(int id, RecordReader<K,U> rr, 069 Class<? extends WritableComparator> cmpcl) 070 throws IOException, InterruptedException { 071 this.id = id; 072 this.rr = rr; 073 if (cmpcl != null) { 074 try { 075 this.cmp = cmpcl.newInstance(); 076 } catch (InstantiationException e) { 077 throw new IOException(e); 078 } catch (IllegalAccessException e) { 079 throw new IOException(e); 080 } 081 } 082 vjoin = new StreamBackedIterator<U>(); 083 } 084 085 public void initialize(InputSplit split, 086 TaskAttemptContext context) 087 throws IOException, InterruptedException { 088 rr.initialize(split, context); 089 conf = context.getConfiguration(); 090 nextKeyValue(); 091 if (!empty) { 092 keyclass = key.getClass().asSubclass(WritableComparable.class); 093 valueclass = value.getClass(); 094 if (cmp == null) { 095 cmp = WritableComparator.get(keyclass, conf); 096 } 097 } 098 } 099 100 /** 101 * Request new key from proxied RR. 102 */ 103 @SuppressWarnings("unchecked") 104 public K createKey() { 105 if (keyclass != null) { 106 return (K) ReflectionUtils.newInstance(keyclass, conf); 107 } 108 return (K) NullWritable.get(); 109 } 110 111 @SuppressWarnings("unchecked") 112 public U createValue() { 113 if (valueclass != null) { 114 return (U) ReflectionUtils.newInstance(valueclass, conf); 115 } 116 return (U) NullWritable.get(); 117 } 118 119 /** {@inheritDoc} */ 120 public int id() { 121 return id; 122 } 123 124 /** 125 * Return the key at the head of this RR. 126 */ 127 public K key() { 128 return key; 129 } 130 131 /** 132 * Clone the key at the head of this RR into the object supplied. 133 */ 134 public void key(K qkey) throws IOException { 135 ReflectionUtils.copy(conf, key, qkey); 136 } 137 138 /** 139 * Return true if the RR- including the k,v pair stored in this object- 140 * is exhausted. 141 */ 142 public boolean hasNext() { 143 return !empty; 144 } 145 146 /** 147 * Skip key-value pairs with keys less than or equal to the key provided. 148 */ 149 public void skip(K key) throws IOException, InterruptedException { 150 if (hasNext()) { 151 while (cmp.compare(key(), key) <= 0 && next()); 152 } 153 } 154 155 /** 156 * Add an iterator to the collector at the position occupied by this 157 * RecordReader over the values in this stream paired with the key 158 * provided (ie register a stream of values from this source matching K 159 * with a collector). 160 */ 161 @SuppressWarnings("unchecked") 162 public void accept(CompositeRecordReader.JoinCollector i, K key) 163 throws IOException, InterruptedException { 164 vjoin.clear(); 165 if (key() != null && 0 == cmp.compare(key, key())) { 166 do { 167 vjoin.add(value); 168 } while (next() && 0 == cmp.compare(key, key())); 169 } 170 i.add(id, vjoin); 171 } 172 173 /** 174 * Read the next k,v pair into the head of this object; return true iff 175 * the RR and this are exhausted. 176 */ 177 public boolean nextKeyValue() throws IOException, InterruptedException { 178 if (hasNext()) { 179 next(); 180 return true; 181 } 182 return false; 183 } 184 185 /** 186 * Read the next k,v pair into the head of this object; return true iff 187 * the RR and this are exhausted. 188 */ 189 private boolean next() throws IOException, InterruptedException { 190 empty = !rr.nextKeyValue(); 191 key = rr.getCurrentKey(); 192 value = rr.getCurrentValue(); 193 return !empty; 194 } 195 196 /** 197 * Get current key 198 */ 199 public K getCurrentKey() throws IOException, InterruptedException { 200 return rr.getCurrentKey(); 201 } 202 203 /** 204 * Get current value 205 */ 206 public U getCurrentValue() throws IOException, InterruptedException { 207 return rr.getCurrentValue(); 208 } 209 210 /** 211 * Request progress from proxied RR. 212 */ 213 public float getProgress() throws IOException, InterruptedException { 214 return rr.getProgress(); 215 } 216 217 /** 218 * Forward close request to proxied RR. 219 */ 220 public void close() throws IOException { 221 rr.close(); 222 } 223 224 /** 225 * Implement Comparable contract (compare key at head of proxied RR 226 * with that of another). 227 */ 228 public int compareTo(ComposableRecordReader<K,?> other) { 229 return cmp.compare(key(), other.key()); 230 } 231 232 /** 233 * Return true iff compareTo(other) retn true. 234 */ 235 @SuppressWarnings("unchecked") // Explicit type check prior to cast 236 public boolean equals(Object other) { 237 return other instanceof ComposableRecordReader 238 && 0 == compareTo((ComposableRecordReader)other); 239 } 240 241 public int hashCode() { 242 assert false : "hashCode not designed"; 243 return 42; 244 } 245}