001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.join; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Comparator; 024import java.util.PriorityQueue; 025 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceStability; 028import org.apache.hadoop.conf.Configurable; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.io.NullWritable; 031import org.apache.hadoop.io.Writable; 032import org.apache.hadoop.io.WritableComparable; 033import org.apache.hadoop.io.WritableComparator; 034import org.apache.hadoop.mapreduce.InputSplit; 035import org.apache.hadoop.mapreduce.RecordReader; 036import org.apache.hadoop.mapreduce.TaskAttemptContext; 037import org.apache.hadoop.util.ReflectionUtils; 038 039/** 040 * A RecordReader that can effect joins of RecordReaders sharing a common key 041 * type and partitioning. 042 */ 043@InterfaceAudience.Public 044@InterfaceStability.Stable 045public abstract class CompositeRecordReader< 046 K extends WritableComparable<?>, // key type 047 V extends Writable, // accepts RecordReader<K,V> as children 048 X extends Writable> // emits Writables of this type 049 extends ComposableRecordReader<K, X> 050 implements Configurable { 051 052 private int id; 053 protected Configuration conf; 054 private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>(); 055 056 private WritableComparator cmp; 057 @SuppressWarnings("unchecked") 058 protected Class<? extends WritableComparable> keyclass = null; 059 private PriorityQueue<ComposableRecordReader<K,?>> q; 060 061 protected final JoinCollector jc; 062 protected final ComposableRecordReader<K,? extends V>[] kids; 063 064 protected abstract boolean combine(Object[] srcs, TupleWritable value); 065 066 protected K key; 067 protected X value; 068 069 /** 070 * Create a RecordReader with <tt>capacity</tt> children to position 071 * <tt>id</tt> in the parent reader. 072 * The id of a root CompositeRecordReader is -1 by convention, but relying 073 * on this is not recommended. 074 */ 075 @SuppressWarnings("unchecked") // Generic array assignment 076 public CompositeRecordReader(int id, int capacity, 077 Class<? extends WritableComparator> cmpcl) 078 throws IOException { 079 assert capacity > 0 : "Invalid capacity"; 080 this.id = id; 081 if (null != cmpcl) { 082 cmp = ReflectionUtils.newInstance(cmpcl, null); 083 q = new PriorityQueue<ComposableRecordReader<K,?>>(3, 084 new Comparator<ComposableRecordReader<K,?>>() { 085 public int compare(ComposableRecordReader<K,?> o1, 086 ComposableRecordReader<K,?> o2) { 087 return cmp.compare(o1.key(), o2.key()); 088 } 089 }); 090 } 091 jc = new JoinCollector(capacity); 092 kids = new ComposableRecordReader[capacity]; 093 } 094 095 @SuppressWarnings("unchecked") 096 public void initialize(InputSplit split, TaskAttemptContext context) 097 throws IOException, InterruptedException { 098 if (kids != null) { 099 for (int i = 0; i < kids.length; ++i) { 100 kids[i].initialize(((CompositeInputSplit)split).get(i), context); 101 if (kids[i].key() == null) { 102 continue; 103 } 104 105 // get keyclass 106 if (keyclass == null) { 107 keyclass = kids[i].createKey().getClass(). 108 asSubclass(WritableComparable.class); 109 } 110 // create priority queue 111 if (null == q) { 112 cmp = WritableComparator.get(keyclass, conf); 113 q = new PriorityQueue<ComposableRecordReader<K,?>>(3, 114 new Comparator<ComposableRecordReader<K,?>>() { 115 public int compare(ComposableRecordReader<K,?> o1, 116 ComposableRecordReader<K,?> o2) { 117 return cmp.compare(o1.key(), o2.key()); 118 } 119 }); 120 } 121 // Explicit check for key class agreement 122 if (!keyclass.equals(kids[i].key().getClass())) { 123 throw new ClassCastException("Child key classes fail to agree"); 124 } 125 126 // add the kid to priority queue if it has any elements 127 if (kids[i].hasNext()) { 128 q.add(kids[i]); 129 } 130 } 131 } 132 } 133 134 /** 135 * Return the position in the collector this class occupies. 136 */ 137 public int id() { 138 return id; 139 } 140 141 /** 142 * {@inheritDoc} 143 */ 144 public void setConf(Configuration conf) { 145 this.conf = conf; 146 } 147 148 /** 149 * {@inheritDoc} 150 */ 151 public Configuration getConf() { 152 return conf; 153 } 154 155 /** 156 * Return sorted list of RecordReaders for this composite. 157 */ 158 protected PriorityQueue<ComposableRecordReader<K,?>> getRecordReaderQueue() { 159 return q; 160 } 161 162 /** 163 * Return comparator defining the ordering for RecordReaders in this 164 * composite. 165 */ 166 protected WritableComparator getComparator() { 167 return cmp; 168 } 169 170 /** 171 * Add a RecordReader to this collection. 172 * The id() of a RecordReader determines where in the Tuple its 173 * entry will appear. Adding RecordReaders with the same id has 174 * undefined behavior. 175 */ 176 public void add(ComposableRecordReader<K,? extends V> rr) 177 throws IOException, InterruptedException { 178 kids[rr.id()] = rr; 179 } 180 181 /** 182 * Collector for join values. 183 * This accumulates values for a given key from the child RecordReaders. If 184 * one or more child RR contain duplicate keys, this will emit the cross 185 * product of the associated values until exhausted. 186 */ 187 public class JoinCollector { 188 private K key; 189 private ResetableIterator<X>[] iters; 190 private int pos = -1; 191 private boolean first = true; 192 193 /** 194 * Construct a collector capable of handling the specified number of 195 * children. 196 */ 197 @SuppressWarnings("unchecked") // Generic array assignment 198 public JoinCollector(int card) { 199 iters = new ResetableIterator[card]; 200 for (int i = 0; i < iters.length; ++i) { 201 iters[i] = EMPTY; 202 } 203 } 204 205 /** 206 * Register a given iterator at position id. 207 */ 208 public void add(int id, ResetableIterator<X> i) 209 throws IOException { 210 iters[id] = i; 211 } 212 213 /** 214 * Return the key associated with this collection. 215 */ 216 public K key() { 217 return key; 218 } 219 220 /** 221 * Codify the contents of the collector to be iterated over. 222 * When this is called, all RecordReaders registered for this 223 * key should have added ResetableIterators. 224 */ 225 public void reset(K key) { 226 this.key = key; 227 first = true; 228 pos = iters.length - 1; 229 for (int i = 0; i < iters.length; ++i) { 230 iters[i].reset(); 231 } 232 } 233 234 /** 235 * Clear all state information. 236 */ 237 public void clear() { 238 key = null; 239 pos = -1; 240 for (int i = 0; i < iters.length; ++i) { 241 iters[i].clear(); 242 iters[i] = EMPTY; 243 } 244 } 245 246 /** 247 * Returns false if exhausted or if reset(K) has not been called. 248 */ 249 public boolean hasNext() { 250 return !(pos < 0); 251 } 252 253 /** 254 * Populate Tuple from iterators. 255 * It should be the case that, given iterators i_1...i_n over values from 256 * sources s_1...s_n sharing key k, repeated calls to next should yield 257 * I x I. 258 */ 259 @SuppressWarnings("unchecked") // No static type info on Tuples 260 protected boolean next(TupleWritable val) throws IOException { 261 if (first) { 262 int i = -1; 263 for (pos = 0; pos < iters.length; ++pos) { 264 if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) { 265 i = pos; 266 val.setWritten(i); 267 } 268 } 269 pos = i; 270 first = false; 271 if (pos < 0) { 272 clear(); 273 return false; 274 } 275 return true; 276 } 277 while (0 <= pos && !(iters[pos].hasNext() && 278 iters[pos].next((X)val.get(pos)))) { 279 --pos; 280 } 281 if (pos < 0) { 282 clear(); 283 return false; 284 } 285 val.setWritten(pos); 286 for (int i = 0; i < pos; ++i) { 287 if (iters[i].replay((X)val.get(i))) { 288 val.setWritten(i); 289 } 290 } 291 while (pos + 1 < iters.length) { 292 ++pos; 293 iters[pos].reset(); 294 if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) { 295 val.setWritten(pos); 296 } 297 } 298 return true; 299 } 300 301 /** 302 * Replay the last Tuple emitted. 303 */ 304 @SuppressWarnings("unchecked") // No static typeinfo on Tuples 305 public boolean replay(TupleWritable val) throws IOException { 306 // The last emitted tuple might have drawn on an empty source; 307 // it can't be cleared prematurely, b/c there may be more duplicate 308 // keys in iterator positions < pos 309 assert !first; 310 boolean ret = false; 311 for (int i = 0; i < iters.length; ++i) { 312 if (iters[i].replay((X)val.get(i))) { 313 val.setWritten(i); 314 ret = true; 315 } 316 } 317 return ret; 318 } 319 320 /** 321 * Close all child iterators. 322 */ 323 public void close() throws IOException { 324 for (int i = 0; i < iters.length; ++i) { 325 iters[i].close(); 326 } 327 } 328 329 /** 330 * Write the next value into key, value as accepted by the operation 331 * associated with this set of RecordReaders. 332 */ 333 public boolean flush(TupleWritable value) throws IOException { 334 while (hasNext()) { 335 value.clearWritten(); 336 if (next(value) && combine(kids, value)) { 337 return true; 338 } 339 } 340 return false; 341 } 342 } 343 344 /** 345 * Return the key for the current join or the value at the top of the 346 * RecordReader heap. 347 */ 348 public K key() { 349 if (jc.hasNext()) { 350 return jc.key(); 351 } 352 if (!q.isEmpty()) { 353 return q.peek().key(); 354 } 355 return null; 356 } 357 358 /** 359 * Clone the key at the top of this RR into the given object. 360 */ 361 public void key(K key) throws IOException { 362 ReflectionUtils.copy(conf, key(), key); 363 } 364 365 public K getCurrentKey() { 366 return key; 367 } 368 369 /** 370 * Return true if it is possible that this could emit more values. 371 */ 372 public boolean hasNext() { 373 return jc.hasNext() || !q.isEmpty(); 374 } 375 376 /** 377 * Pass skip key to child RRs. 378 */ 379 public void skip(K key) throws IOException, InterruptedException { 380 ArrayList<ComposableRecordReader<K,?>> tmp = 381 new ArrayList<ComposableRecordReader<K,?>>(); 382 while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) { 383 tmp.add(q.poll()); 384 } 385 for (ComposableRecordReader<K,?> rr : tmp) { 386 rr.skip(key); 387 if (rr.hasNext()) { 388 q.add(rr); 389 } 390 } 391 } 392 393 /** 394 * Obtain an iterator over the child RRs apropos of the value type 395 * ultimately emitted from this join. 396 */ 397 protected abstract ResetableIterator<X> getDelegate(); 398 399 /** 400 * If key provided matches that of this Composite, give JoinCollector 401 * iterator over values it may emit. 402 */ 403 @SuppressWarnings("unchecked") // No values from static EMPTY class 404 @Override 405 public void accept(CompositeRecordReader.JoinCollector jc, K key) 406 throws IOException, InterruptedException { 407 if (hasNext() && 0 == cmp.compare(key, key())) { 408 fillJoinCollector(createKey()); 409 jc.add(id, getDelegate()); 410 return; 411 } 412 jc.add(id, EMPTY); 413 } 414 415 /** 416 * For all child RRs offering the key provided, obtain an iterator 417 * at that position in the JoinCollector. 418 */ 419 protected void fillJoinCollector(K iterkey) 420 throws IOException, InterruptedException { 421 if (!q.isEmpty()) { 422 q.peek().key(iterkey); 423 while (0 == cmp.compare(q.peek().key(), iterkey)) { 424 ComposableRecordReader<K,?> t = q.poll(); 425 t.accept(jc, iterkey); 426 if (t.hasNext()) { 427 q.add(t); 428 } else if (q.isEmpty()) { 429 return; 430 } 431 } 432 } 433 } 434 435 /** 436 * Implement Comparable contract (compare key of join or head of heap 437 * with that of another). 438 */ 439 public int compareTo(ComposableRecordReader<K,?> other) { 440 return cmp.compare(key(), other.key()); 441 } 442 443 /** 444 * Create a new key common to all child RRs. 445 * @throws ClassCastException if key classes differ. 446 */ 447 @SuppressWarnings("unchecked") 448 protected K createKey() { 449 if (keyclass == null || keyclass.equals(NullWritable.class)) { 450 return (K) NullWritable.get(); 451 } 452 return (K) ReflectionUtils.newInstance(keyclass, getConf()); 453 } 454 455 /** 456 * Create a value to be used internally for joins. 457 */ 458 protected TupleWritable createTupleWritable() { 459 Writable[] vals = new Writable[kids.length]; 460 for (int i = 0; i < vals.length; ++i) { 461 vals[i] = kids[i].createValue(); 462 } 463 return new TupleWritable(vals); 464 } 465 466 /** {@inheritDoc} */ 467 public X getCurrentValue() 468 throws IOException, InterruptedException { 469 return value; 470 } 471 472 /** 473 * Close all child RRs. 474 */ 475 public void close() throws IOException { 476 if (kids != null) { 477 for (RecordReader<K,? extends Writable> rr : kids) { 478 rr.close(); 479 } 480 } 481 if (jc != null) { 482 jc.close(); 483 } 484 } 485 486 /** 487 * Report progress as the minimum of all child RR progress. 488 */ 489 public float getProgress() throws IOException, InterruptedException { 490 float ret = 1.0f; 491 for (RecordReader<K,? extends Writable> rr : kids) { 492 ret = Math.min(ret, rr.getProgress()); 493 } 494 return ret; 495 } 496 497}