001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.join;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Comparator;
024    import java.util.PriorityQueue;
025    
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.conf.Configurable;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.io.Writable;
031    import org.apache.hadoop.io.WritableComparable;
032    import org.apache.hadoop.io.WritableComparator;
033    import org.apache.hadoop.io.WritableUtils;
034    import org.apache.hadoop.mapred.RecordReader;
035    import org.apache.hadoop.util.ReflectionUtils;
036    
037    /**
038     * A RecordReader that can effect joins of RecordReaders sharing a common key
039     * type and partitioning.
040     */
041    @InterfaceAudience.Public
042    @InterfaceStability.Stable
043    public abstract class CompositeRecordReader<
044        K extends WritableComparable, // key type
045        V extends Writable,           // accepts RecordReader<K,V> as children
046        X extends Writable>           // emits Writables of this type
047        implements Configurable {
048    
049    
050      private int id;
051      private Configuration conf;
052      private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
053    
054      private WritableComparator cmp;
055      private Class<? extends WritableComparable> keyclass;
056      private PriorityQueue<ComposableRecordReader<K,?>> q;
057    
058      protected final JoinCollector jc;
059      protected final ComposableRecordReader<K,? extends V>[] kids;
060    
061      protected abstract boolean combine(Object[] srcs, TupleWritable value);
062    
063      /**
064       * Create a RecordReader with <tt>capacity</tt> children to position
065       * <tt>id</tt> in the parent reader.
066       * The id of a root CompositeRecordReader is -1 by convention, but relying
067       * on this is not recommended.
068       */
069      @SuppressWarnings("unchecked") // Generic array assignment
070      public CompositeRecordReader(int id, int capacity,
071          Class<? extends WritableComparator> cmpcl)
072          throws IOException {
073        assert capacity > 0 : "Invalid capacity";
074        this.id = id;
075        if (null != cmpcl) {
076          cmp = ReflectionUtils.newInstance(cmpcl, null);
077          q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
078              new Comparator<ComposableRecordReader<K,?>>() {
079                public int compare(ComposableRecordReader<K,?> o1,
080                                   ComposableRecordReader<K,?> o2) {
081                  return cmp.compare(o1.key(), o2.key());
082                }
083              });
084        }
085        jc = new JoinCollector(capacity);
086        kids = new ComposableRecordReader[capacity];
087      }
088    
089      /**
090       * Return the position in the collector this class occupies.
091       */
092      public int id() {
093        return id;
094      }
095    
096      /**
097       * {@inheritDoc}
098       */
099      public void setConf(Configuration conf) {
100        this.conf = conf;
101      }
102    
103      /**
104       * {@inheritDoc}
105       */
106      public Configuration getConf() {
107        return conf;
108      }
109    
110      /**
111       * Return sorted list of RecordReaders for this composite.
112       */
113      protected PriorityQueue<ComposableRecordReader<K,?>> getRecordReaderQueue() {
114        return q;
115      }
116    
117      /**
118       * Return comparator defining the ordering for RecordReaders in this
119       * composite.
120       */
121      protected WritableComparator getComparator() {
122        return cmp;
123      }
124    
125      /**
126       * Add a RecordReader to this collection.
127       * The id() of a RecordReader determines where in the Tuple its
128       * entry will appear. Adding RecordReaders with the same id has
129       * undefined behavior.
130       */
131      public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
132        kids[rr.id()] = rr;
133        if (null == q) {
134          cmp = WritableComparator.get(rr.createKey().getClass());
135          q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
136              new Comparator<ComposableRecordReader<K,?>>() {
137                public int compare(ComposableRecordReader<K,?> o1,
138                                   ComposableRecordReader<K,?> o2) {
139                  return cmp.compare(o1.key(), o2.key());
140                }
141              });
142        }
143        if (rr.hasNext()) {
144          q.add(rr);
145        }
146      }
147    
148      /**
149       * Collector for join values.
150       * This accumulates values for a given key from the child RecordReaders. If
151       * one or more child RR contain duplicate keys, this will emit the cross
152       * product of the associated values until exhausted.
153       */
154      class JoinCollector {
155        private K key;
156        private ResetableIterator<X>[] iters;
157        private int pos = -1;
158        private boolean first = true;
159    
160        /**
161         * Construct a collector capable of handling the specified number of
162         * children.
163         */
164        @SuppressWarnings("unchecked") // Generic array assignment
165        public JoinCollector(int card) {
166          iters = new ResetableIterator[card];
167          for (int i = 0; i < iters.length; ++i) {
168            iters[i] = EMPTY;
169          }
170        }
171    
172        /**
173         * Register a given iterator at position id.
174         */
175        public void add(int id, ResetableIterator<X> i)
176            throws IOException {
177          iters[id] = i;
178        }
179    
180        /**
181         * Return the key associated with this collection.
182         */
183        public K key() {
184          return key;
185        }
186    
187        /**
188         * Codify the contents of the collector to be iterated over.
189         * When this is called, all RecordReaders registered for this
190         * key should have added ResetableIterators.
191         */
192        public void reset(K key) {
193          this.key = key;
194          first = true;
195          pos = iters.length - 1;
196          for (int i = 0; i < iters.length; ++i) {
197            iters[i].reset();
198          }
199        }
200    
201        /**
202         * Clear all state information.
203         */
204        public void clear() {
205          key = null;
206          pos = -1;
207          for (int i = 0; i < iters.length; ++i) {
208            iters[i].clear();
209            iters[i] = EMPTY;
210          }
211        }
212    
213        /**
214         * Returns false if exhausted or if reset(K) has not been called.
215         */
216        protected boolean hasNext() {
217          return !(pos < 0);
218        }
219    
220        /**
221         * Populate Tuple from iterators.
222         * It should be the case that, given iterators i_1...i_n over values from
223         * sources s_1...s_n sharing key k, repeated calls to next should yield
224         * I x I.
225         */
226        @SuppressWarnings("unchecked") // No static typeinfo on Tuples
227        protected boolean next(TupleWritable val) throws IOException {
228          if (first) {
229            int i = -1;
230            for (pos = 0; pos < iters.length; ++pos) {
231              if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
232                i = pos;
233                val.setWritten(i);
234              }
235            }
236            pos = i;
237            first = false;
238            if (pos < 0) {
239              clear();
240              return false;
241            }
242            return true;
243          }
244          while (0 <= pos && !(iters[pos].hasNext() &&
245                               iters[pos].next((X)val.get(pos)))) {
246            --pos;
247          }
248          if (pos < 0) {
249            clear();
250            return false;
251          }
252          val.setWritten(pos);
253          for (int i = 0; i < pos; ++i) {
254            if (iters[i].replay((X)val.get(i))) {
255              val.setWritten(i);
256            }
257          }
258          while (pos + 1 < iters.length) {
259            ++pos;
260            iters[pos].reset();
261            if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
262              val.setWritten(pos);
263            }
264          }
265          return true;
266        }
267    
268        /**
269         * Replay the last Tuple emitted.
270         */
271        @SuppressWarnings("unchecked") // No static typeinfo on Tuples
272        public boolean replay(TupleWritable val) throws IOException {
273          // The last emitted tuple might have drawn on an empty source;
274          // it can't be cleared prematurely, b/c there may be more duplicate
275          // keys in iterator positions < pos
276          assert !first;
277          boolean ret = false;
278          for (int i = 0; i < iters.length; ++i) {
279            if (iters[i].replay((X)val.get(i))) {
280              val.setWritten(i);
281              ret = true;
282            }
283          }
284          return ret;
285        }
286    
287        /**
288         * Close all child iterators.
289         */
290        public void close() throws IOException {
291          for (int i = 0; i < iters.length; ++i) {
292            iters[i].close();
293          }
294        }
295    
296        /**
297         * Write the next value into key, value as accepted by the operation
298         * associated with this set of RecordReaders.
299         */
300        public boolean flush(TupleWritable value) throws IOException {
301          while (hasNext()) {
302            value.clearWritten();
303            if (next(value) && combine(kids, value)) {
304              return true;
305            }
306          }
307          return false;
308        }
309      }
310    
311      /**
312       * Return the key for the current join or the value at the top of the
313       * RecordReader heap.
314       */
315      public K key() {
316        if (jc.hasNext()) {
317          return jc.key();
318        }
319        if (!q.isEmpty()) {
320          return q.peek().key();
321        }
322        return null;
323      }
324    
325      /**
326       * Clone the key at the top of this RR into the given object.
327       */
328      public void key(K key) throws IOException {
329        WritableUtils.cloneInto(key, key());
330      }
331    
332      /**
333       * Return true if it is possible that this could emit more values.
334       */
335      public boolean hasNext() {
336        return jc.hasNext() || !q.isEmpty();
337      }
338    
339      /**
340       * Pass skip key to child RRs.
341       */
342      public void skip(K key) throws IOException {
343        ArrayList<ComposableRecordReader<K,?>> tmp =
344          new ArrayList<ComposableRecordReader<K,?>>();
345        while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
346          tmp.add(q.poll());
347        }
348        for (ComposableRecordReader<K,?> rr : tmp) {
349          rr.skip(key);
350          if (rr.hasNext()) {
351            q.add(rr);
352          }
353        }
354      }
355    
356      /**
357       * Obtain an iterator over the child RRs apropos of the value type
358       * ultimately emitted from this join.
359       */
360      protected abstract ResetableIterator<X> getDelegate();
361    
362      /**
363       * If key provided matches that of this Composite, give JoinCollector
364       * iterator over values it may emit.
365       */
366      @SuppressWarnings("unchecked") // No values from static EMPTY class
367      public void accept(CompositeRecordReader.JoinCollector jc, K key)
368          throws IOException {
369        if (hasNext() && 0 == cmp.compare(key, key())) {
370          fillJoinCollector(createKey());
371          jc.add(id, getDelegate());
372          return;
373        }
374        jc.add(id, EMPTY);
375      }
376    
377      /**
378       * For all child RRs offering the key provided, obtain an iterator
379       * at that position in the JoinCollector.
380       */
381      protected void fillJoinCollector(K iterkey) throws IOException {
382        if (!q.isEmpty()) {
383          q.peek().key(iterkey);
384          while (0 == cmp.compare(q.peek().key(), iterkey)) {
385            ComposableRecordReader<K,?> t = q.poll();
386            t.accept(jc, iterkey);
387            if (t.hasNext()) {
388              q.add(t);
389            } else if (q.isEmpty()) {
390              return;
391            }
392          }
393        }
394      }
395    
396      /**
397       * Implement Comparable contract (compare key of join or head of heap
398       * with that of another).
399       */
400      public int compareTo(ComposableRecordReader<K,?> other) {
401        return cmp.compare(key(), other.key());
402      }
403    
404      /**
405       * Create a new key value common to all child RRs.
406       * @throws ClassCastException if key classes differ.
407       */
408      @SuppressWarnings("unchecked") // Explicit check for key class agreement
409      public K createKey() {
410        if (null == keyclass) {
411          final Class<?> cls = kids[0].createKey().getClass();
412          for (RecordReader<K,? extends Writable> rr : kids) {
413            if (!cls.equals(rr.createKey().getClass())) {
414              throw new ClassCastException("Child key classes fail to agree");
415            }
416          }
417          keyclass = cls.asSubclass(WritableComparable.class);
418        }
419        return (K) ReflectionUtils.newInstance(keyclass, getConf());
420      }
421    
422      /**
423       * Create a value to be used internally for joins.
424       */
425      protected TupleWritable createInternalValue() {
426        Writable[] vals = new Writable[kids.length];
427        for (int i = 0; i < vals.length; ++i) {
428          vals[i] = kids[i].createValue();
429        }
430        return new TupleWritable(vals);
431      }
432    
433      /**
434       * Unsupported (returns zero in all cases).
435       */
436      public long getPos() throws IOException {
437        return 0;
438      }
439    
440      /**
441       * Close all child RRs.
442       */
443      public void close() throws IOException {
444        if (kids != null) {
445          for (RecordReader<K,? extends Writable> rr : kids) {
446            rr.close();
447          }
448        }
449        if (jc != null) {
450          jc.close();
451        }
452      }
453    
454      /**
455       * Report progress as the minimum of all child RR progress.
456       */
457      public float getProgress() throws IOException {
458        float ret = 1.0f;
459        for (RecordReader<K,? extends Writable> rr : kids) {
460          ret = Math.min(ret, rr.getProgress());
461        }
462        return ret;
463      }
464    }