001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred.join; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Comparator; 024import java.util.PriorityQueue; 025 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceStability; 028import org.apache.hadoop.conf.Configurable; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.io.Writable; 031import org.apache.hadoop.io.WritableComparable; 032import org.apache.hadoop.io.WritableComparator; 033import org.apache.hadoop.io.WritableUtils; 034import org.apache.hadoop.mapred.RecordReader; 035import org.apache.hadoop.util.ReflectionUtils; 036 037/** 038 * A RecordReader that can effect joins of RecordReaders sharing a common key 039 * type and partitioning. 040 */ 041@InterfaceAudience.Public 042@InterfaceStability.Stable 043public abstract class CompositeRecordReader< 044 K extends WritableComparable, // key type 045 V extends Writable, // accepts RecordReader<K,V> as children 046 X extends Writable> // emits Writables of this type 047 implements Configurable { 048 049 050 private int id; 051 private Configuration conf; 052 private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>(); 053 054 private WritableComparator cmp; 055 private Class<? extends WritableComparable> keyclass; 056 private PriorityQueue<ComposableRecordReader<K,?>> q; 057 058 protected final JoinCollector jc; 059 protected final ComposableRecordReader<K,? extends V>[] kids; 060 061 protected abstract boolean combine(Object[] srcs, TupleWritable value); 062 063 /** 064 * Create a RecordReader with <tt>capacity</tt> children to position 065 * <tt>id</tt> in the parent reader. 066 * The id of a root CompositeRecordReader is -1 by convention, but relying 067 * on this is not recommended. 068 */ 069 @SuppressWarnings("unchecked") // Generic array assignment 070 public CompositeRecordReader(int id, int capacity, 071 Class<? extends WritableComparator> cmpcl) 072 throws IOException { 073 assert capacity > 0 : "Invalid capacity"; 074 this.id = id; 075 if (null != cmpcl) { 076 cmp = ReflectionUtils.newInstance(cmpcl, null); 077 q = new PriorityQueue<ComposableRecordReader<K,?>>(3, 078 new Comparator<ComposableRecordReader<K,?>>() { 079 public int compare(ComposableRecordReader<K,?> o1, 080 ComposableRecordReader<K,?> o2) { 081 return cmp.compare(o1.key(), o2.key()); 082 } 083 }); 084 } 085 jc = new JoinCollector(capacity); 086 kids = new ComposableRecordReader[capacity]; 087 } 088 089 /** 090 * Return the position in the collector this class occupies. 091 */ 092 public int id() { 093 return id; 094 } 095 096 /** 097 * {@inheritDoc} 098 */ 099 public void setConf(Configuration conf) { 100 this.conf = conf; 101 } 102 103 /** 104 * {@inheritDoc} 105 */ 106 public Configuration getConf() { 107 return conf; 108 } 109 110 /** 111 * Return sorted list of RecordReaders for this composite. 112 */ 113 protected PriorityQueue<ComposableRecordReader<K,?>> getRecordReaderQueue() { 114 return q; 115 } 116 117 /** 118 * Return comparator defining the ordering for RecordReaders in this 119 * composite. 120 */ 121 protected WritableComparator getComparator() { 122 return cmp; 123 } 124 125 /** 126 * Add a RecordReader to this collection. 127 * The id() of a RecordReader determines where in the Tuple its 128 * entry will appear. Adding RecordReaders with the same id has 129 * undefined behavior. 130 */ 131 public void add(ComposableRecordReader<K,? extends V> rr) throws IOException { 132 kids[rr.id()] = rr; 133 if (null == q) { 134 cmp = WritableComparator.get(rr.createKey().getClass(), conf); 135 q = new PriorityQueue<ComposableRecordReader<K,?>>(3, 136 new Comparator<ComposableRecordReader<K,?>>() { 137 public int compare(ComposableRecordReader<K,?> o1, 138 ComposableRecordReader<K,?> o2) { 139 return cmp.compare(o1.key(), o2.key()); 140 } 141 }); 142 } 143 if (rr.hasNext()) { 144 q.add(rr); 145 } 146 } 147 148 /** 149 * Collector for join values. 150 * This accumulates values for a given key from the child RecordReaders. If 151 * one or more child RR contain duplicate keys, this will emit the cross 152 * product of the associated values until exhausted. 153 */ 154 class JoinCollector { 155 private K key; 156 private ResetableIterator<X>[] iters; 157 private int pos = -1; 158 private boolean first = true; 159 160 /** 161 * Construct a collector capable of handling the specified number of 162 * children. 163 */ 164 @SuppressWarnings("unchecked") // Generic array assignment 165 public JoinCollector(int card) { 166 iters = new ResetableIterator[card]; 167 for (int i = 0; i < iters.length; ++i) { 168 iters[i] = EMPTY; 169 } 170 } 171 172 /** 173 * Register a given iterator at position id. 174 */ 175 public void add(int id, ResetableIterator<X> i) 176 throws IOException { 177 iters[id] = i; 178 } 179 180 /** 181 * Return the key associated with this collection. 182 */ 183 public K key() { 184 return key; 185 } 186 187 /** 188 * Codify the contents of the collector to be iterated over. 189 * When this is called, all RecordReaders registered for this 190 * key should have added ResetableIterators. 191 */ 192 public void reset(K key) { 193 this.key = key; 194 first = true; 195 pos = iters.length - 1; 196 for (int i = 0; i < iters.length; ++i) { 197 iters[i].reset(); 198 } 199 } 200 201 /** 202 * Clear all state information. 203 */ 204 public void clear() { 205 key = null; 206 pos = -1; 207 for (int i = 0; i < iters.length; ++i) { 208 iters[i].clear(); 209 iters[i] = EMPTY; 210 } 211 } 212 213 /** 214 * Returns false if exhausted or if reset(K) has not been called. 215 */ 216 protected boolean hasNext() { 217 return !(pos < 0); 218 } 219 220 /** 221 * Populate Tuple from iterators. 222 * It should be the case that, given iterators i_1...i_n over values from 223 * sources s_1...s_n sharing key k, repeated calls to next should yield 224 * I x I. 225 */ 226 @SuppressWarnings("unchecked") // No static typeinfo on Tuples 227 protected boolean next(TupleWritable val) throws IOException { 228 if (first) { 229 int i = -1; 230 for (pos = 0; pos < iters.length; ++pos) { 231 if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) { 232 i = pos; 233 val.setWritten(i); 234 } 235 } 236 pos = i; 237 first = false; 238 if (pos < 0) { 239 clear(); 240 return false; 241 } 242 return true; 243 } 244 while (0 <= pos && !(iters[pos].hasNext() && 245 iters[pos].next((X)val.get(pos)))) { 246 --pos; 247 } 248 if (pos < 0) { 249 clear(); 250 return false; 251 } 252 val.setWritten(pos); 253 for (int i = 0; i < pos; ++i) { 254 if (iters[i].replay((X)val.get(i))) { 255 val.setWritten(i); 256 } 257 } 258 while (pos + 1 < iters.length) { 259 ++pos; 260 iters[pos].reset(); 261 if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) { 262 val.setWritten(pos); 263 } 264 } 265 return true; 266 } 267 268 /** 269 * Replay the last Tuple emitted. 270 */ 271 @SuppressWarnings("unchecked") // No static typeinfo on Tuples 272 public boolean replay(TupleWritable val) throws IOException { 273 // The last emitted tuple might have drawn on an empty source; 274 // it can't be cleared prematurely, b/c there may be more duplicate 275 // keys in iterator positions < pos 276 assert !first; 277 boolean ret = false; 278 for (int i = 0; i < iters.length; ++i) { 279 if (iters[i].replay((X)val.get(i))) { 280 val.setWritten(i); 281 ret = true; 282 } 283 } 284 return ret; 285 } 286 287 /** 288 * Close all child iterators. 289 */ 290 public void close() throws IOException { 291 for (int i = 0; i < iters.length; ++i) { 292 iters[i].close(); 293 } 294 } 295 296 /** 297 * Write the next value into key, value as accepted by the operation 298 * associated with this set of RecordReaders. 299 */ 300 public boolean flush(TupleWritable value) throws IOException { 301 while (hasNext()) { 302 value.clearWritten(); 303 if (next(value) && combine(kids, value)) { 304 return true; 305 } 306 } 307 return false; 308 } 309 } 310 311 /** 312 * Return the key for the current join or the value at the top of the 313 * RecordReader heap. 314 */ 315 public K key() { 316 if (jc.hasNext()) { 317 return jc.key(); 318 } 319 if (!q.isEmpty()) { 320 return q.peek().key(); 321 } 322 return null; 323 } 324 325 /** 326 * Clone the key at the top of this RR into the given object. 327 */ 328 public void key(K key) throws IOException { 329 WritableUtils.cloneInto(key, key()); 330 } 331 332 /** 333 * Return true if it is possible that this could emit more values. 334 */ 335 public boolean hasNext() { 336 return jc.hasNext() || !q.isEmpty(); 337 } 338 339 /** 340 * Pass skip key to child RRs. 341 */ 342 public void skip(K key) throws IOException { 343 ArrayList<ComposableRecordReader<K,?>> tmp = 344 new ArrayList<ComposableRecordReader<K,?>>(); 345 while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) { 346 tmp.add(q.poll()); 347 } 348 for (ComposableRecordReader<K,?> rr : tmp) { 349 rr.skip(key); 350 if (rr.hasNext()) { 351 q.add(rr); 352 } 353 } 354 } 355 356 /** 357 * Obtain an iterator over the child RRs apropos of the value type 358 * ultimately emitted from this join. 359 */ 360 protected abstract ResetableIterator<X> getDelegate(); 361 362 /** 363 * If key provided matches that of this Composite, give JoinCollector 364 * iterator over values it may emit. 365 */ 366 @SuppressWarnings("unchecked") // No values from static EMPTY class 367 public void accept(CompositeRecordReader.JoinCollector jc, K key) 368 throws IOException { 369 if (hasNext() && 0 == cmp.compare(key, key())) { 370 fillJoinCollector(createKey()); 371 jc.add(id, getDelegate()); 372 return; 373 } 374 jc.add(id, EMPTY); 375 } 376 377 /** 378 * For all child RRs offering the key provided, obtain an iterator 379 * at that position in the JoinCollector. 380 */ 381 protected void fillJoinCollector(K iterkey) throws IOException { 382 if (!q.isEmpty()) { 383 q.peek().key(iterkey); 384 while (0 == cmp.compare(q.peek().key(), iterkey)) { 385 ComposableRecordReader<K,?> t = q.poll(); 386 t.accept(jc, iterkey); 387 if (t.hasNext()) { 388 q.add(t); 389 } else if (q.isEmpty()) { 390 return; 391 } 392 } 393 } 394 } 395 396 /** 397 * Implement Comparable contract (compare key of join or head of heap 398 * with that of another). 399 */ 400 public int compareTo(ComposableRecordReader<K,?> other) { 401 return cmp.compare(key(), other.key()); 402 } 403 404 /** 405 * Create a new key value common to all child RRs. 406 * @throws ClassCastException if key classes differ. 407 */ 408 @SuppressWarnings("unchecked") // Explicit check for key class agreement 409 public K createKey() { 410 if (null == keyclass) { 411 final Class<?> cls = kids[0].createKey().getClass(); 412 for (RecordReader<K,? extends Writable> rr : kids) { 413 if (!cls.equals(rr.createKey().getClass())) { 414 throw new ClassCastException("Child key classes fail to agree"); 415 } 416 } 417 keyclass = cls.asSubclass(WritableComparable.class); 418 } 419 return (K) ReflectionUtils.newInstance(keyclass, getConf()); 420 } 421 422 /** 423 * Create a value to be used internally for joins. 424 */ 425 protected TupleWritable createInternalValue() { 426 Writable[] vals = new Writable[kids.length]; 427 for (int i = 0; i < vals.length; ++i) { 428 vals[i] = kids[i].createValue(); 429 } 430 return new TupleWritable(vals); 431 } 432 433 /** 434 * Unsupported (returns zero in all cases). 435 */ 436 public long getPos() throws IOException { 437 return 0; 438 } 439 440 /** 441 * Close all child RRs. 442 */ 443 public void close() throws IOException { 444 if (kids != null) { 445 for (RecordReader<K,? extends Writable> rr : kids) { 446 rr.close(); 447 } 448 } 449 if (jc != null) { 450 jc.close(); 451 } 452 } 453 454 /** 455 * Report progress as the minimum of all child RR progress. 456 */ 457 public float getProgress() throws IOException { 458 float ret = 1.0f; 459 for (RecordReader<K,? extends Writable> rr : kids) { 460 ret = Math.min(ret, rr.getProgress()); 461 } 462 return ret; 463 } 464}