001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.join;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.Comparator;
024 import java.util.PriorityQueue;
025
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.conf.Configurable;
029 import org.apache.hadoop.conf.Configuration;
030 import org.apache.hadoop.io.Writable;
031 import org.apache.hadoop.io.WritableComparable;
032 import org.apache.hadoop.io.WritableComparator;
033 import org.apache.hadoop.io.WritableUtils;
034 import org.apache.hadoop.mapred.RecordReader;
035 import org.apache.hadoop.util.ReflectionUtils;
036
037 /**
038 * A RecordReader that can effect joins of RecordReaders sharing a common key
039 * type and partitioning.
040 */
041 @InterfaceAudience.Public
042 @InterfaceStability.Stable
043 public abstract class CompositeRecordReader<
044 K extends WritableComparable, // key type
045 V extends Writable, // accepts RecordReader<K,V> as children
046 X extends Writable> // emits Writables of this type
047 implements Configurable {
048
049
050 private int id;
051 private Configuration conf;
052 private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
053
054 private WritableComparator cmp;
055 private Class<? extends WritableComparable> keyclass;
056 private PriorityQueue<ComposableRecordReader<K,?>> q;
057
058 protected final JoinCollector jc;
059 protected final ComposableRecordReader<K,? extends V>[] kids;
060
061 protected abstract boolean combine(Object[] srcs, TupleWritable value);
062
063 /**
064 * Create a RecordReader with <tt>capacity</tt> children to position
065 * <tt>id</tt> in the parent reader.
066 * The id of a root CompositeRecordReader is -1 by convention, but relying
067 * on this is not recommended.
068 */
069 @SuppressWarnings("unchecked") // Generic array assignment
070 public CompositeRecordReader(int id, int capacity,
071 Class<? extends WritableComparator> cmpcl)
072 throws IOException {
073 assert capacity > 0 : "Invalid capacity";
074 this.id = id;
075 if (null != cmpcl) {
076 cmp = ReflectionUtils.newInstance(cmpcl, null);
077 q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
078 new Comparator<ComposableRecordReader<K,?>>() {
079 public int compare(ComposableRecordReader<K,?> o1,
080 ComposableRecordReader<K,?> o2) {
081 return cmp.compare(o1.key(), o2.key());
082 }
083 });
084 }
085 jc = new JoinCollector(capacity);
086 kids = new ComposableRecordReader[capacity];
087 }
088
089 /**
090 * Return the position in the collector this class occupies.
091 */
092 public int id() {
093 return id;
094 }
095
096 /**
097 * {@inheritDoc}
098 */
099 public void setConf(Configuration conf) {
100 this.conf = conf;
101 }
102
103 /**
104 * {@inheritDoc}
105 */
106 public Configuration getConf() {
107 return conf;
108 }
109
110 /**
111 * Return sorted list of RecordReaders for this composite.
112 */
113 protected PriorityQueue<ComposableRecordReader<K,?>> getRecordReaderQueue() {
114 return q;
115 }
116
117 /**
118 * Return comparator defining the ordering for RecordReaders in this
119 * composite.
120 */
121 protected WritableComparator getComparator() {
122 return cmp;
123 }
124
125 /**
126 * Add a RecordReader to this collection.
127 * The id() of a RecordReader determines where in the Tuple its
128 * entry will appear. Adding RecordReaders with the same id has
129 * undefined behavior.
130 */
131 public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
132 kids[rr.id()] = rr;
133 if (null == q) {
134 cmp = WritableComparator.get(rr.createKey().getClass(), conf);
135 q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
136 new Comparator<ComposableRecordReader<K,?>>() {
137 public int compare(ComposableRecordReader<K,?> o1,
138 ComposableRecordReader<K,?> o2) {
139 return cmp.compare(o1.key(), o2.key());
140 }
141 });
142 }
143 if (rr.hasNext()) {
144 q.add(rr);
145 }
146 }
147
148 /**
149 * Collector for join values.
150 * This accumulates values for a given key from the child RecordReaders. If
151 * one or more child RR contain duplicate keys, this will emit the cross
152 * product of the associated values until exhausted.
153 */
154 class JoinCollector {
155 private K key;
156 private ResetableIterator<X>[] iters;
157 private int pos = -1;
158 private boolean first = true;
159
160 /**
161 * Construct a collector capable of handling the specified number of
162 * children.
163 */
164 @SuppressWarnings("unchecked") // Generic array assignment
165 public JoinCollector(int card) {
166 iters = new ResetableIterator[card];
167 for (int i = 0; i < iters.length; ++i) {
168 iters[i] = EMPTY;
169 }
170 }
171
172 /**
173 * Register a given iterator at position id.
174 */
175 public void add(int id, ResetableIterator<X> i)
176 throws IOException {
177 iters[id] = i;
178 }
179
180 /**
181 * Return the key associated with this collection.
182 */
183 public K key() {
184 return key;
185 }
186
187 /**
188 * Codify the contents of the collector to be iterated over.
189 * When this is called, all RecordReaders registered for this
190 * key should have added ResetableIterators.
191 */
192 public void reset(K key) {
193 this.key = key;
194 first = true;
195 pos = iters.length - 1;
196 for (int i = 0; i < iters.length; ++i) {
197 iters[i].reset();
198 }
199 }
200
201 /**
202 * Clear all state information.
203 */
204 public void clear() {
205 key = null;
206 pos = -1;
207 for (int i = 0; i < iters.length; ++i) {
208 iters[i].clear();
209 iters[i] = EMPTY;
210 }
211 }
212
213 /**
214 * Returns false if exhausted or if reset(K) has not been called.
215 */
216 protected boolean hasNext() {
217 return !(pos < 0);
218 }
219
220 /**
221 * Populate Tuple from iterators.
222 * It should be the case that, given iterators i_1...i_n over values from
223 * sources s_1...s_n sharing key k, repeated calls to next should yield
224 * I x I.
225 */
226 @SuppressWarnings("unchecked") // No static typeinfo on Tuples
227 protected boolean next(TupleWritable val) throws IOException {
228 if (first) {
229 int i = -1;
230 for (pos = 0; pos < iters.length; ++pos) {
231 if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
232 i = pos;
233 val.setWritten(i);
234 }
235 }
236 pos = i;
237 first = false;
238 if (pos < 0) {
239 clear();
240 return false;
241 }
242 return true;
243 }
244 while (0 <= pos && !(iters[pos].hasNext() &&
245 iters[pos].next((X)val.get(pos)))) {
246 --pos;
247 }
248 if (pos < 0) {
249 clear();
250 return false;
251 }
252 val.setWritten(pos);
253 for (int i = 0; i < pos; ++i) {
254 if (iters[i].replay((X)val.get(i))) {
255 val.setWritten(i);
256 }
257 }
258 while (pos + 1 < iters.length) {
259 ++pos;
260 iters[pos].reset();
261 if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
262 val.setWritten(pos);
263 }
264 }
265 return true;
266 }
267
268 /**
269 * Replay the last Tuple emitted.
270 */
271 @SuppressWarnings("unchecked") // No static typeinfo on Tuples
272 public boolean replay(TupleWritable val) throws IOException {
273 // The last emitted tuple might have drawn on an empty source;
274 // it can't be cleared prematurely, b/c there may be more duplicate
275 // keys in iterator positions < pos
276 assert !first;
277 boolean ret = false;
278 for (int i = 0; i < iters.length; ++i) {
279 if (iters[i].replay((X)val.get(i))) {
280 val.setWritten(i);
281 ret = true;
282 }
283 }
284 return ret;
285 }
286
287 /**
288 * Close all child iterators.
289 */
290 public void close() throws IOException {
291 for (int i = 0; i < iters.length; ++i) {
292 iters[i].close();
293 }
294 }
295
296 /**
297 * Write the next value into key, value as accepted by the operation
298 * associated with this set of RecordReaders.
299 */
300 public boolean flush(TupleWritable value) throws IOException {
301 while (hasNext()) {
302 value.clearWritten();
303 if (next(value) && combine(kids, value)) {
304 return true;
305 }
306 }
307 return false;
308 }
309 }
310
311 /**
312 * Return the key for the current join or the value at the top of the
313 * RecordReader heap.
314 */
315 public K key() {
316 if (jc.hasNext()) {
317 return jc.key();
318 }
319 if (!q.isEmpty()) {
320 return q.peek().key();
321 }
322 return null;
323 }
324
325 /**
326 * Clone the key at the top of this RR into the given object.
327 */
328 public void key(K key) throws IOException {
329 WritableUtils.cloneInto(key, key());
330 }
331
332 /**
333 * Return true if it is possible that this could emit more values.
334 */
335 public boolean hasNext() {
336 return jc.hasNext() || !q.isEmpty();
337 }
338
339 /**
340 * Pass skip key to child RRs.
341 */
342 public void skip(K key) throws IOException {
343 ArrayList<ComposableRecordReader<K,?>> tmp =
344 new ArrayList<ComposableRecordReader<K,?>>();
345 while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
346 tmp.add(q.poll());
347 }
348 for (ComposableRecordReader<K,?> rr : tmp) {
349 rr.skip(key);
350 if (rr.hasNext()) {
351 q.add(rr);
352 }
353 }
354 }
355
356 /**
357 * Obtain an iterator over the child RRs apropos of the value type
358 * ultimately emitted from this join.
359 */
360 protected abstract ResetableIterator<X> getDelegate();
361
362 /**
363 * If key provided matches that of this Composite, give JoinCollector
364 * iterator over values it may emit.
365 */
366 @SuppressWarnings("unchecked") // No values from static EMPTY class
367 public void accept(CompositeRecordReader.JoinCollector jc, K key)
368 throws IOException {
369 if (hasNext() && 0 == cmp.compare(key, key())) {
370 fillJoinCollector(createKey());
371 jc.add(id, getDelegate());
372 return;
373 }
374 jc.add(id, EMPTY);
375 }
376
377 /**
378 * For all child RRs offering the key provided, obtain an iterator
379 * at that position in the JoinCollector.
380 */
381 protected void fillJoinCollector(K iterkey) throws IOException {
382 if (!q.isEmpty()) {
383 q.peek().key(iterkey);
384 while (0 == cmp.compare(q.peek().key(), iterkey)) {
385 ComposableRecordReader<K,?> t = q.poll();
386 t.accept(jc, iterkey);
387 if (t.hasNext()) {
388 q.add(t);
389 } else if (q.isEmpty()) {
390 return;
391 }
392 }
393 }
394 }
395
396 /**
397 * Implement Comparable contract (compare key of join or head of heap
398 * with that of another).
399 */
400 public int compareTo(ComposableRecordReader<K,?> other) {
401 return cmp.compare(key(), other.key());
402 }
403
404 /**
405 * Create a new key value common to all child RRs.
406 * @throws ClassCastException if key classes differ.
407 */
408 @SuppressWarnings("unchecked") // Explicit check for key class agreement
409 public K createKey() {
410 if (null == keyclass) {
411 final Class<?> cls = kids[0].createKey().getClass();
412 for (RecordReader<K,? extends Writable> rr : kids) {
413 if (!cls.equals(rr.createKey().getClass())) {
414 throw new ClassCastException("Child key classes fail to agree");
415 }
416 }
417 keyclass = cls.asSubclass(WritableComparable.class);
418 }
419 return (K) ReflectionUtils.newInstance(keyclass, getConf());
420 }
421
422 /**
423 * Create a value to be used internally for joins.
424 */
425 protected TupleWritable createInternalValue() {
426 Writable[] vals = new Writable[kids.length];
427 for (int i = 0; i < vals.length; ++i) {
428 vals[i] = kids[i].createValue();
429 }
430 return new TupleWritable(vals);
431 }
432
433 /**
434 * Unsupported (returns zero in all cases).
435 */
436 public long getPos() throws IOException {
437 return 0;
438 }
439
440 /**
441 * Close all child RRs.
442 */
443 public void close() throws IOException {
444 if (kids != null) {
445 for (RecordReader<K,? extends Writable> rr : kids) {
446 rr.close();
447 }
448 }
449 if (jc != null) {
450 jc.close();
451 }
452 }
453
454 /**
455 * Report progress as the minimum of all child RR progress.
456 */
457 public float getProgress() throws IOException {
458 float ret = 1.0f;
459 for (RecordReader<K,? extends Writable> rr : kids) {
460 ret = Math.min(ret, rr.getProgress());
461 }
462 return ret;
463 }
464 }