001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.join;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Comparator;
024import java.util.PriorityQueue;
025
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceStability;
028import org.apache.hadoop.conf.Configurable;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.io.Writable;
031import org.apache.hadoop.io.WritableComparable;
032import org.apache.hadoop.io.WritableComparator;
033import org.apache.hadoop.io.WritableUtils;
034import org.apache.hadoop.mapred.RecordReader;
035import org.apache.hadoop.util.ReflectionUtils;
036
037/**
038 * A RecordReader that can effect joins of RecordReaders sharing a common key
039 * type and partitioning.
040 */
041@InterfaceAudience.Public
042@InterfaceStability.Stable
043public abstract class CompositeRecordReader<
044    K extends WritableComparable, // key type
045    V extends Writable,           // accepts RecordReader<K,V> as children
046    X extends Writable>           // emits Writables of this type
047    implements Configurable {
048
049
050  private int id;
051  private Configuration conf;
052  private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
053
054  private WritableComparator cmp;
055  private Class<? extends WritableComparable> keyclass;
056  private PriorityQueue<ComposableRecordReader<K,?>> q;
057
058  protected final JoinCollector jc;
059  protected final ComposableRecordReader<K,? extends V>[] kids;
060
061  protected abstract boolean combine(Object[] srcs, TupleWritable value);
062
063  /**
064   * Create a RecordReader with <tt>capacity</tt> children to position
065   * <tt>id</tt> in the parent reader.
066   * The id of a root CompositeRecordReader is -1 by convention, but relying
067   * on this is not recommended.
068   */
069  @SuppressWarnings("unchecked") // Generic array assignment
070  public CompositeRecordReader(int id, int capacity,
071      Class<? extends WritableComparator> cmpcl)
072      throws IOException {
073    assert capacity > 0 : "Invalid capacity";
074    this.id = id;
075    if (null != cmpcl) {
076      cmp = ReflectionUtils.newInstance(cmpcl, null);
077      q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
078          new Comparator<ComposableRecordReader<K,?>>() {
079            public int compare(ComposableRecordReader<K,?> o1,
080                               ComposableRecordReader<K,?> o2) {
081              return cmp.compare(o1.key(), o2.key());
082            }
083          });
084    }
085    jc = new JoinCollector(capacity);
086    kids = new ComposableRecordReader[capacity];
087  }
088
089  /**
090   * Return the position in the collector this class occupies.
091   */
092  public int id() {
093    return id;
094  }
095
096  /**
097   * {@inheritDoc}
098   */
099  public void setConf(Configuration conf) {
100    this.conf = conf;
101  }
102
103  /**
104   * {@inheritDoc}
105   */
106  public Configuration getConf() {
107    return conf;
108  }
109
110  /**
111   * Return sorted list of RecordReaders for this composite.
112   */
113  protected PriorityQueue<ComposableRecordReader<K,?>> getRecordReaderQueue() {
114    return q;
115  }
116
117  /**
118   * Return comparator defining the ordering for RecordReaders in this
119   * composite.
120   */
121  protected WritableComparator getComparator() {
122    return cmp;
123  }
124
125  /**
126   * Add a RecordReader to this collection.
127   * The id() of a RecordReader determines where in the Tuple its
128   * entry will appear. Adding RecordReaders with the same id has
129   * undefined behavior.
130   */
131  public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
132    kids[rr.id()] = rr;
133    if (null == q) {
134      cmp = WritableComparator.get(rr.createKey().getClass(), conf);
135      q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
136          new Comparator<ComposableRecordReader<K,?>>() {
137            public int compare(ComposableRecordReader<K,?> o1,
138                               ComposableRecordReader<K,?> o2) {
139              return cmp.compare(o1.key(), o2.key());
140            }
141          });
142    }
143    if (rr.hasNext()) {
144      q.add(rr);
145    }
146  }
147
148  /**
149   * Collector for join values.
150   * This accumulates values for a given key from the child RecordReaders. If
151   * one or more child RR contain duplicate keys, this will emit the cross
152   * product of the associated values until exhausted.
153   */
154  class JoinCollector {
155    private K key;
156    private ResetableIterator<X>[] iters;
157    private int pos = -1;
158    private boolean first = true;
159
160    /**
161     * Construct a collector capable of handling the specified number of
162     * children.
163     */
164    @SuppressWarnings("unchecked") // Generic array assignment
165    public JoinCollector(int card) {
166      iters = new ResetableIterator[card];
167      for (int i = 0; i < iters.length; ++i) {
168        iters[i] = EMPTY;
169      }
170    }
171
172    /**
173     * Register a given iterator at position id.
174     */
175    public void add(int id, ResetableIterator<X> i)
176        throws IOException {
177      iters[id] = i;
178    }
179
180    /**
181     * Return the key associated with this collection.
182     */
183    public K key() {
184      return key;
185    }
186
187    /**
188     * Codify the contents of the collector to be iterated over.
189     * When this is called, all RecordReaders registered for this
190     * key should have added ResetableIterators.
191     */
192    public void reset(K key) {
193      this.key = key;
194      first = true;
195      pos = iters.length - 1;
196      for (int i = 0; i < iters.length; ++i) {
197        iters[i].reset();
198      }
199    }
200
201    /**
202     * Clear all state information.
203     */
204    public void clear() {
205      key = null;
206      pos = -1;
207      for (int i = 0; i < iters.length; ++i) {
208        iters[i].clear();
209        iters[i] = EMPTY;
210      }
211    }
212
213    /**
214     * Returns false if exhausted or if reset(K) has not been called.
215     */
216    protected boolean hasNext() {
217      return !(pos < 0);
218    }
219
220    /**
221     * Populate Tuple from iterators.
222     * It should be the case that, given iterators i_1...i_n over values from
223     * sources s_1...s_n sharing key k, repeated calls to next should yield
224     * I x I.
225     */
226    @SuppressWarnings("unchecked") // No static typeinfo on Tuples
227    protected boolean next(TupleWritable val) throws IOException {
228      if (first) {
229        int i = -1;
230        for (pos = 0; pos < iters.length; ++pos) {
231          if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
232            i = pos;
233            val.setWritten(i);
234          }
235        }
236        pos = i;
237        first = false;
238        if (pos < 0) {
239          clear();
240          return false;
241        }
242        return true;
243      }
244      while (0 <= pos && !(iters[pos].hasNext() &&
245                           iters[pos].next((X)val.get(pos)))) {
246        --pos;
247      }
248      if (pos < 0) {
249        clear();
250        return false;
251      }
252      val.setWritten(pos);
253      for (int i = 0; i < pos; ++i) {
254        if (iters[i].replay((X)val.get(i))) {
255          val.setWritten(i);
256        }
257      }
258      while (pos + 1 < iters.length) {
259        ++pos;
260        iters[pos].reset();
261        if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
262          val.setWritten(pos);
263        }
264      }
265      return true;
266    }
267
268    /**
269     * Replay the last Tuple emitted.
270     */
271    @SuppressWarnings("unchecked") // No static typeinfo on Tuples
272    public boolean replay(TupleWritable val) throws IOException {
273      // The last emitted tuple might have drawn on an empty source;
274      // it can't be cleared prematurely, b/c there may be more duplicate
275      // keys in iterator positions < pos
276      assert !first;
277      boolean ret = false;
278      for (int i = 0; i < iters.length; ++i) {
279        if (iters[i].replay((X)val.get(i))) {
280          val.setWritten(i);
281          ret = true;
282        }
283      }
284      return ret;
285    }
286
287    /**
288     * Close all child iterators.
289     */
290    public void close() throws IOException {
291      for (int i = 0; i < iters.length; ++i) {
292        iters[i].close();
293      }
294    }
295
296    /**
297     * Write the next value into key, value as accepted by the operation
298     * associated with this set of RecordReaders.
299     */
300    public boolean flush(TupleWritable value) throws IOException {
301      while (hasNext()) {
302        value.clearWritten();
303        if (next(value) && combine(kids, value)) {
304          return true;
305        }
306      }
307      return false;
308    }
309  }
310
311  /**
312   * Return the key for the current join or the value at the top of the
313   * RecordReader heap.
314   */
315  public K key() {
316    if (jc.hasNext()) {
317      return jc.key();
318    }
319    if (!q.isEmpty()) {
320      return q.peek().key();
321    }
322    return null;
323  }
324
325  /**
326   * Clone the key at the top of this RR into the given object.
327   */
328  public void key(K key) throws IOException {
329    WritableUtils.cloneInto(key, key());
330  }
331
332  /**
333   * Return true if it is possible that this could emit more values.
334   */
335  public boolean hasNext() {
336    return jc.hasNext() || !q.isEmpty();
337  }
338
339  /**
340   * Pass skip key to child RRs.
341   */
342  public void skip(K key) throws IOException {
343    ArrayList<ComposableRecordReader<K,?>> tmp =
344      new ArrayList<ComposableRecordReader<K,?>>();
345    while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
346      tmp.add(q.poll());
347    }
348    for (ComposableRecordReader<K,?> rr : tmp) {
349      rr.skip(key);
350      if (rr.hasNext()) {
351        q.add(rr);
352      }
353    }
354  }
355
356  /**
357   * Obtain an iterator over the child RRs apropos of the value type
358   * ultimately emitted from this join.
359   */
360  protected abstract ResetableIterator<X> getDelegate();
361
362  /**
363   * If key provided matches that of this Composite, give JoinCollector
364   * iterator over values it may emit.
365   */
366  @SuppressWarnings("unchecked") // No values from static EMPTY class
367  public void accept(CompositeRecordReader.JoinCollector jc, K key)
368      throws IOException {
369    if (hasNext() && 0 == cmp.compare(key, key())) {
370      fillJoinCollector(createKey());
371      jc.add(id, getDelegate());
372      return;
373    }
374    jc.add(id, EMPTY);
375  }
376
377  /**
378   * For all child RRs offering the key provided, obtain an iterator
379   * at that position in the JoinCollector.
380   */
381  protected void fillJoinCollector(K iterkey) throws IOException {
382    if (!q.isEmpty()) {
383      q.peek().key(iterkey);
384      while (0 == cmp.compare(q.peek().key(), iterkey)) {
385        ComposableRecordReader<K,?> t = q.poll();
386        t.accept(jc, iterkey);
387        if (t.hasNext()) {
388          q.add(t);
389        } else if (q.isEmpty()) {
390          return;
391        }
392      }
393    }
394  }
395
396  /**
397   * Implement Comparable contract (compare key of join or head of heap
398   * with that of another).
399   */
400  public int compareTo(ComposableRecordReader<K,?> other) {
401    return cmp.compare(key(), other.key());
402  }
403
404  /**
405   * Create a new key value common to all child RRs.
406   * @throws ClassCastException if key classes differ.
407   */
408  @SuppressWarnings("unchecked") // Explicit check for key class agreement
409  public K createKey() {
410    if (null == keyclass) {
411      final Class<?> cls = kids[0].createKey().getClass();
412      for (RecordReader<K,? extends Writable> rr : kids) {
413        if (!cls.equals(rr.createKey().getClass())) {
414          throw new ClassCastException("Child key classes fail to agree");
415        }
416      }
417      keyclass = cls.asSubclass(WritableComparable.class);
418    }
419    return (K) ReflectionUtils.newInstance(keyclass, getConf());
420  }
421
422  /**
423   * Create a value to be used internally for joins.
424   */
425  protected TupleWritable createInternalValue() {
426    Writable[] vals = new Writable[kids.length];
427    for (int i = 0; i < vals.length; ++i) {
428      vals[i] = kids[i].createValue();
429    }
430    return new TupleWritable(vals);
431  }
432
433  /**
434   * Unsupported (returns zero in all cases).
435   */
436  public long getPos() throws IOException {
437    return 0;
438  }
439
440  /**
441   * Close all child RRs.
442   */
443  public void close() throws IOException {
444    if (kids != null) {
445      for (RecordReader<K,? extends Writable> rr : kids) {
446        rr.close();
447      }
448    }
449    if (jc != null) {
450      jc.close();
451    }
452  }
453
454  /**
455   * Report progress as the minimum of all child RR progress.
456   */
457  public float getProgress() throws IOException {
458    float ret = 1.0f;
459    for (RecordReader<K,? extends Writable> rr : kids) {
460      ret = Math.min(ret, rr.getProgress());
461    }
462    return ret;
463  }
464}