001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.join;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Comparator;
024import java.util.PriorityQueue;
025
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceStability;
028import org.apache.hadoop.conf.Configurable;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.io.NullWritable;
031import org.apache.hadoop.io.Writable;
032import org.apache.hadoop.io.WritableComparable;
033import org.apache.hadoop.io.WritableComparator;
034import org.apache.hadoop.mapreduce.InputSplit;
035import org.apache.hadoop.mapreduce.RecordReader;
036import org.apache.hadoop.mapreduce.TaskAttemptContext;
037import org.apache.hadoop.util.ReflectionUtils;
038
039/**
040 * A RecordReader that can effect joins of RecordReaders sharing a common key
041 * type and partitioning.
042 */
043@InterfaceAudience.Public
044@InterfaceStability.Stable
045public abstract class CompositeRecordReader<
046    K extends WritableComparable<?>, // key type
047    V extends Writable,  // accepts RecordReader<K,V> as children
048    X extends Writable>  // emits Writables of this type
049    extends ComposableRecordReader<K, X>
050    implements Configurable {
051
052  private int id;
053  protected Configuration conf;
054  private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
055
056  private WritableComparator cmp;
057  @SuppressWarnings("unchecked")
058  protected Class<? extends WritableComparable> keyclass = null;
059  private PriorityQueue<ComposableRecordReader<K,?>> q;
060
061  protected final JoinCollector jc;
062  protected final ComposableRecordReader<K,? extends V>[] kids;
063
064  protected abstract boolean combine(Object[] srcs, TupleWritable value);
065  
066  protected K key;
067  protected X value;
068
069  /**
070   * Create a RecordReader with <tt>capacity</tt> children to position
071   * <tt>id</tt> in the parent reader.
072   * The id of a root CompositeRecordReader is -1 by convention, but relying
073   * on this is not recommended.
074   */
075  @SuppressWarnings("unchecked") // Generic array assignment
076  public CompositeRecordReader(int id, int capacity,
077      Class<? extends WritableComparator> cmpcl)
078      throws IOException {
079    assert capacity > 0 : "Invalid capacity";
080    this.id = id;
081    if (null != cmpcl) {
082      cmp = ReflectionUtils.newInstance(cmpcl, null);
083      q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
084            new Comparator<ComposableRecordReader<K,?>>() {
085              public int compare(ComposableRecordReader<K,?> o1,
086                                 ComposableRecordReader<K,?> o2) {
087                return cmp.compare(o1.key(), o2.key());
088              }
089            });
090    }
091    jc = new JoinCollector(capacity);
092    kids = new ComposableRecordReader[capacity];
093  }
094
095  @SuppressWarnings("unchecked")
096  public void initialize(InputSplit split, TaskAttemptContext context) 
097      throws IOException, InterruptedException {
098    if (kids != null) {
099      for (int i = 0; i < kids.length; ++i) {
100        kids[i].initialize(((CompositeInputSplit)split).get(i), context);
101        if (kids[i].key() == null) {
102          continue;
103        }
104        
105        // get keyclass
106        if (keyclass == null) {
107          keyclass = kids[i].createKey().getClass().
108            asSubclass(WritableComparable.class);
109        }
110        // create priority queue
111        if (null == q) {
112          cmp = WritableComparator.get(keyclass, conf);
113          q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
114                new Comparator<ComposableRecordReader<K,?>>() {
115                  public int compare(ComposableRecordReader<K,?> o1,
116                                     ComposableRecordReader<K,?> o2) {
117                    return cmp.compare(o1.key(), o2.key());
118                  }
119                });
120        }
121        // Explicit check for key class agreement
122        if (!keyclass.equals(kids[i].key().getClass())) {
123          throw new ClassCastException("Child key classes fail to agree");
124        }
125        
126        // add the kid to priority queue if it has any elements
127        if (kids[i].hasNext()) {
128          q.add(kids[i]);
129        }
130      }
131    }
132  }
133
134  /**
135   * Return the position in the collector this class occupies.
136   */
137  public int id() {
138    return id;
139  }
140
141  /**
142   * {@inheritDoc}
143   */
144  public void setConf(Configuration conf) {
145    this.conf = conf;
146  }
147
148  /**
149   * {@inheritDoc}
150   */
151  public Configuration getConf() {
152    return conf;
153  }
154
155  /**
156   * Return sorted list of RecordReaders for this composite.
157   */
158  protected PriorityQueue<ComposableRecordReader<K,?>> getRecordReaderQueue() {
159    return q;
160  }
161
162  /**
163   * Return comparator defining the ordering for RecordReaders in this
164   * composite.
165   */
166  protected WritableComparator getComparator() {
167    return cmp;
168  }
169
170  /**
171   * Add a RecordReader to this collection.
172   * The id() of a RecordReader determines where in the Tuple its
173   * entry will appear. Adding RecordReaders with the same id has
174   * undefined behavior.
175   */
176  public void add(ComposableRecordReader<K,? extends V> rr) 
177      throws IOException, InterruptedException {
178    kids[rr.id()] = rr;
179  }
180
181  /**
182   * Collector for join values.
183   * This accumulates values for a given key from the child RecordReaders. If
184   * one or more child RR contain duplicate keys, this will emit the cross
185   * product of the associated values until exhausted.
186   */
187  public class JoinCollector {
188    private K key;
189    private ResetableIterator<X>[] iters;
190    private int pos = -1;
191    private boolean first = true;
192
193    /**
194     * Construct a collector capable of handling the specified number of
195     * children.
196     */
197    @SuppressWarnings("unchecked") // Generic array assignment
198    public JoinCollector(int card) {
199      iters = new ResetableIterator[card];
200      for (int i = 0; i < iters.length; ++i) {
201        iters[i] = EMPTY;
202      }
203    }
204
205    /**
206     * Register a given iterator at position id.
207     */
208    public void add(int id, ResetableIterator<X> i)
209        throws IOException {
210      iters[id] = i;
211    }
212
213    /**
214     * Return the key associated with this collection.
215     */
216    public K key() {
217      return key;
218    }
219
220    /**
221     * Codify the contents of the collector to be iterated over.
222     * When this is called, all RecordReaders registered for this
223     * key should have added ResetableIterators.
224     */
225    public void reset(K key) {
226      this.key = key;
227      first = true;
228      pos = iters.length - 1;
229      for (int i = 0; i < iters.length; ++i) {
230        iters[i].reset();
231      }
232    }
233
234    /**
235     * Clear all state information.
236     */
237    public void clear() {
238      key = null;
239      pos = -1;
240      for (int i = 0; i < iters.length; ++i) {
241        iters[i].clear();
242        iters[i] = EMPTY;
243      }
244    }
245
246    /**
247     * Returns false if exhausted or if reset(K) has not been called.
248     */
249    public boolean hasNext() {
250      return !(pos < 0);
251    }
252
253    /**
254     * Populate Tuple from iterators.
255     * It should be the case that, given iterators i_1...i_n over values from
256     * sources s_1...s_n sharing key k, repeated calls to next should yield
257     * I x I.
258     */
259    @SuppressWarnings("unchecked") // No static type info on Tuples
260    protected boolean next(TupleWritable val) throws IOException {
261      if (first) {
262        int i = -1;
263        for (pos = 0; pos < iters.length; ++pos) {
264          if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
265            i = pos;
266            val.setWritten(i);
267          }
268        }
269        pos = i;
270        first = false;
271        if (pos < 0) {
272          clear();
273          return false;
274        }
275        return true;
276      }
277      while (0 <= pos && !(iters[pos].hasNext() &&
278                           iters[pos].next((X)val.get(pos)))) {
279        --pos;
280      }
281      if (pos < 0) {
282        clear();
283        return false;
284      }
285      val.setWritten(pos);
286      for (int i = 0; i < pos; ++i) {
287        if (iters[i].replay((X)val.get(i))) {
288          val.setWritten(i);
289        }
290      }
291      while (pos + 1 < iters.length) {
292        ++pos;
293        iters[pos].reset();
294        if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
295          val.setWritten(pos);
296        }
297      }
298      return true;
299    }
300
301    /**
302     * Replay the last Tuple emitted.
303     */
304    @SuppressWarnings("unchecked") // No static typeinfo on Tuples
305    public boolean replay(TupleWritable val) throws IOException {
306      // The last emitted tuple might have drawn on an empty source;
307      // it can't be cleared prematurely, b/c there may be more duplicate
308      // keys in iterator positions < pos
309      assert !first;
310      boolean ret = false;
311      for (int i = 0; i < iters.length; ++i) {
312        if (iters[i].replay((X)val.get(i))) {
313          val.setWritten(i);
314          ret = true;
315        }
316      }
317      return ret;
318    }
319
320    /**
321     * Close all child iterators.
322     */
323    public void close() throws IOException {
324      for (int i = 0; i < iters.length; ++i) {
325        iters[i].close();
326      }
327    }
328
329    /**
330     * Write the next value into key, value as accepted by the operation
331     * associated with this set of RecordReaders.
332     */
333    public boolean flush(TupleWritable value) throws IOException {
334      while (hasNext()) {
335        value.clearWritten();
336        if (next(value) && combine(kids, value)) {
337          return true;
338        }
339      }
340      return false;
341    }
342  }
343
344  /**
345   * Return the key for the current join or the value at the top of the
346   * RecordReader heap.
347   */
348  public K key() {
349    if (jc.hasNext()) {
350      return jc.key();
351    }
352    if (!q.isEmpty()) {
353      return q.peek().key();
354    }
355    return null;
356  }
357
358  /**
359   * Clone the key at the top of this RR into the given object.
360   */
361  public void key(K key) throws IOException {
362    ReflectionUtils.copy(conf, key(), key);
363  }
364
365  public K getCurrentKey() {
366    return key;
367  }
368  
369  /**
370   * Return true if it is possible that this could emit more values.
371   */
372  public boolean hasNext() {
373    return jc.hasNext() || !q.isEmpty();
374  }
375
376  /**
377   * Pass skip key to child RRs.
378   */
379  public void skip(K key) throws IOException, InterruptedException {
380    ArrayList<ComposableRecordReader<K,?>> tmp =
381      new ArrayList<ComposableRecordReader<K,?>>();
382    while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
383      tmp.add(q.poll());
384    }
385    for (ComposableRecordReader<K,?> rr : tmp) {
386      rr.skip(key);
387      if (rr.hasNext()) {
388        q.add(rr);
389      }
390    }
391  }
392
393  /**
394   * Obtain an iterator over the child RRs apropos of the value type
395   * ultimately emitted from this join.
396   */
397  protected abstract ResetableIterator<X> getDelegate();
398
399  /**
400   * If key provided matches that of this Composite, give JoinCollector
401   * iterator over values it may emit.
402   */
403  @SuppressWarnings("unchecked") // No values from static EMPTY class
404  @Override
405  public void accept(CompositeRecordReader.JoinCollector jc, K key)
406      throws IOException, InterruptedException {
407    if (hasNext() && 0 == cmp.compare(key, key())) {
408      fillJoinCollector(createKey());
409      jc.add(id, getDelegate());
410      return;
411    }
412    jc.add(id, EMPTY);
413  }
414
415  /**
416   * For all child RRs offering the key provided, obtain an iterator
417   * at that position in the JoinCollector.
418   */
419  protected void fillJoinCollector(K iterkey) 
420      throws IOException, InterruptedException {
421    if (!q.isEmpty()) {
422      q.peek().key(iterkey);
423      while (0 == cmp.compare(q.peek().key(), iterkey)) {
424        ComposableRecordReader<K,?> t = q.poll();
425        t.accept(jc, iterkey);
426        if (t.hasNext()) {
427          q.add(t);
428        } else if (q.isEmpty()) {
429          return;
430        }
431      }
432    }
433  }
434
435  /**
436   * Implement Comparable contract (compare key of join or head of heap
437   * with that of another).
438   */
439  public int compareTo(ComposableRecordReader<K,?> other) {
440    return cmp.compare(key(), other.key());
441  }
442
443  /**
444   * Create a new key common to all child RRs.
445   * @throws ClassCastException if key classes differ.
446   */
447  @SuppressWarnings("unchecked")
448  protected K createKey() {
449    if (keyclass == null || keyclass.equals(NullWritable.class)) {
450      return (K) NullWritable.get();
451    }
452    return (K) ReflectionUtils.newInstance(keyclass, getConf());
453  }
454
455  /**
456   * Create a value to be used internally for joins.
457   */
458  protected TupleWritable createTupleWritable() {
459    Writable[] vals = new Writable[kids.length];
460    for (int i = 0; i < vals.length; ++i) {
461      vals[i] = kids[i].createValue();
462    }
463    return new TupleWritable(vals);
464  }
465
466  /** {@inheritDoc} */
467  public X getCurrentValue() 
468      throws IOException, InterruptedException {
469    return value;
470  }
471
472  /**
473   * Close all child RRs.
474   */
475  public void close() throws IOException {
476    if (kids != null) {
477      for (RecordReader<K,? extends Writable> rr : kids) {
478        rr.close();
479      }
480    }
481    if (jc != null) {
482      jc.close();
483    }
484  }
485
486  /**
487   * Report progress as the minimum of all child RR progress.
488   */
489  public float getProgress() throws IOException, InterruptedException {
490    float ret = 1.0f;
491    for (RecordReader<K,? extends Writable> rr : kids) {
492      ret = Math.min(ret, rr.getProgress());
493    }
494    return ret;
495  }
496  
497}