001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.join;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Comparator;
024    import java.util.PriorityQueue;
025    
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.conf.Configurable;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.io.NullWritable;
031    import org.apache.hadoop.io.Writable;
032    import org.apache.hadoop.io.WritableComparable;
033    import org.apache.hadoop.io.WritableComparator;
034    import org.apache.hadoop.mapreduce.InputSplit;
035    import org.apache.hadoop.mapreduce.RecordReader;
036    import org.apache.hadoop.mapreduce.TaskAttemptContext;
037    import org.apache.hadoop.util.ReflectionUtils;
038    
039    /**
040     * A RecordReader that can effect joins of RecordReaders sharing a common key
041     * type and partitioning.
042     */
043    @InterfaceAudience.Public
044    @InterfaceStability.Stable
045    public abstract class CompositeRecordReader<
046        K extends WritableComparable<?>, // key type
047        V extends Writable,  // accepts RecordReader<K,V> as children
048        X extends Writable>  // emits Writables of this type
049        extends ComposableRecordReader<K, X>
050        implements Configurable {
051    
052      private int id;
053      protected Configuration conf;
054      private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
055    
056      private WritableComparator cmp;
057      @SuppressWarnings("unchecked")
058      protected Class<? extends WritableComparable> keyclass = null;
059      private PriorityQueue<ComposableRecordReader<K,?>> q;
060    
061      protected final JoinCollector jc;
062      protected final ComposableRecordReader<K,? extends V>[] kids;
063    
064      protected abstract boolean combine(Object[] srcs, TupleWritable value);
065      
066      protected K key;
067      protected X value;
068    
069      /**
070       * Create a RecordReader with <tt>capacity</tt> children to position
071       * <tt>id</tt> in the parent reader.
072       * The id of a root CompositeRecordReader is -1 by convention, but relying
073       * on this is not recommended.
074       */
075      @SuppressWarnings("unchecked") // Generic array assignment
076      public CompositeRecordReader(int id, int capacity,
077          Class<? extends WritableComparator> cmpcl)
078          throws IOException {
079        assert capacity > 0 : "Invalid capacity";
080        this.id = id;
081        if (null != cmpcl) {
082          cmp = ReflectionUtils.newInstance(cmpcl, null);
083          q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
084                new Comparator<ComposableRecordReader<K,?>>() {
085                  public int compare(ComposableRecordReader<K,?> o1,
086                                     ComposableRecordReader<K,?> o2) {
087                    return cmp.compare(o1.key(), o2.key());
088                  }
089                });
090        }
091        jc = new JoinCollector(capacity);
092        kids = new ComposableRecordReader[capacity];
093      }
094    
095      @SuppressWarnings("unchecked")
096      public void initialize(InputSplit split, TaskAttemptContext context) 
097          throws IOException, InterruptedException {
098        if (kids != null) {
099          for (int i = 0; i < kids.length; ++i) {
100            kids[i].initialize(((CompositeInputSplit)split).get(i), context);
101            if (kids[i].key() == null) {
102              continue;
103            }
104            
105            // get keyclass
106            if (keyclass == null) {
107              keyclass = kids[i].createKey().getClass().
108                asSubclass(WritableComparable.class);
109            }
110            // create priority queue
111            if (null == q) {
112              cmp = WritableComparator.get(keyclass);
113              q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
114                    new Comparator<ComposableRecordReader<K,?>>() {
115                      public int compare(ComposableRecordReader<K,?> o1,
116                                         ComposableRecordReader<K,?> o2) {
117                        return cmp.compare(o1.key(), o2.key());
118                      }
119                    });
120            }
121            // Explicit check for key class agreement
122            if (!keyclass.equals(kids[i].key().getClass())) {
123              throw new ClassCastException("Child key classes fail to agree");
124            }
125            
126            // add the kid to priority queue if it has any elements
127            if (kids[i].hasNext()) {
128              q.add(kids[i]);
129            }
130          }
131        }
132      }
133    
134      /**
135       * Return the position in the collector this class occupies.
136       */
137      public int id() {
138        return id;
139      }
140    
141      /**
142       * {@inheritDoc}
143       */
144      public void setConf(Configuration conf) {
145        this.conf = conf;
146      }
147    
148      /**
149       * {@inheritDoc}
150       */
151      public Configuration getConf() {
152        return conf;
153      }
154    
155      /**
156       * Return sorted list of RecordReaders for this composite.
157       */
158      protected PriorityQueue<ComposableRecordReader<K,?>> getRecordReaderQueue() {
159        return q;
160      }
161    
162      /**
163       * Return comparator defining the ordering for RecordReaders in this
164       * composite.
165       */
166      protected WritableComparator getComparator() {
167        return cmp;
168      }
169    
170      /**
171       * Add a RecordReader to this collection.
172       * The id() of a RecordReader determines where in the Tuple its
173       * entry will appear. Adding RecordReaders with the same id has
174       * undefined behavior.
175       */
176      public void add(ComposableRecordReader<K,? extends V> rr) 
177          throws IOException, InterruptedException {
178        kids[rr.id()] = rr;
179      }
180    
181      /**
182       * Collector for join values.
183       * This accumulates values for a given key from the child RecordReaders. If
184       * one or more child RR contain duplicate keys, this will emit the cross
185       * product of the associated values until exhausted.
186       */
187      public class JoinCollector {
188        private K key;
189        private ResetableIterator<X>[] iters;
190        private int pos = -1;
191        private boolean first = true;
192    
193        /**
194         * Construct a collector capable of handling the specified number of
195         * children.
196         */
197        @SuppressWarnings("unchecked") // Generic array assignment
198        public JoinCollector(int card) {
199          iters = new ResetableIterator[card];
200          for (int i = 0; i < iters.length; ++i) {
201            iters[i] = EMPTY;
202          }
203        }
204    
205        /**
206         * Register a given iterator at position id.
207         */
208        public void add(int id, ResetableIterator<X> i)
209            throws IOException {
210          iters[id] = i;
211        }
212    
213        /**
214         * Return the key associated with this collection.
215         */
216        public K key() {
217          return key;
218        }
219    
220        /**
221         * Codify the contents of the collector to be iterated over.
222         * When this is called, all RecordReaders registered for this
223         * key should have added ResetableIterators.
224         */
225        public void reset(K key) {
226          this.key = key;
227          first = true;
228          pos = iters.length - 1;
229          for (int i = 0; i < iters.length; ++i) {
230            iters[i].reset();
231          }
232        }
233    
234        /**
235         * Clear all state information.
236         */
237        public void clear() {
238          key = null;
239          pos = -1;
240          for (int i = 0; i < iters.length; ++i) {
241            iters[i].clear();
242            iters[i] = EMPTY;
243          }
244        }
245    
246        /**
247         * Returns false if exhausted or if reset(K) has not been called.
248         */
249        public boolean hasNext() {
250          return !(pos < 0);
251        }
252    
253        /**
254         * Populate Tuple from iterators.
255         * It should be the case that, given iterators i_1...i_n over values from
256         * sources s_1...s_n sharing key k, repeated calls to next should yield
257         * I x I.
258         */
259        @SuppressWarnings("unchecked") // No static type info on Tuples
260        protected boolean next(TupleWritable val) throws IOException {
261          if (first) {
262            int i = -1;
263            for (pos = 0; pos < iters.length; ++pos) {
264              if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
265                i = pos;
266                val.setWritten(i);
267              }
268            }
269            pos = i;
270            first = false;
271            if (pos < 0) {
272              clear();
273              return false;
274            }
275            return true;
276          }
277          while (0 <= pos && !(iters[pos].hasNext() &&
278                               iters[pos].next((X)val.get(pos)))) {
279            --pos;
280          }
281          if (pos < 0) {
282            clear();
283            return false;
284          }
285          val.setWritten(pos);
286          for (int i = 0; i < pos; ++i) {
287            if (iters[i].replay((X)val.get(i))) {
288              val.setWritten(i);
289            }
290          }
291          while (pos + 1 < iters.length) {
292            ++pos;
293            iters[pos].reset();
294            if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
295              val.setWritten(pos);
296            }
297          }
298          return true;
299        }
300    
301        /**
302         * Replay the last Tuple emitted.
303         */
304        @SuppressWarnings("unchecked") // No static typeinfo on Tuples
305        public boolean replay(TupleWritable val) throws IOException {
306          // The last emitted tuple might have drawn on an empty source;
307          // it can't be cleared prematurely, b/c there may be more duplicate
308          // keys in iterator positions < pos
309          assert !first;
310          boolean ret = false;
311          for (int i = 0; i < iters.length; ++i) {
312            if (iters[i].replay((X)val.get(i))) {
313              val.setWritten(i);
314              ret = true;
315            }
316          }
317          return ret;
318        }
319    
320        /**
321         * Close all child iterators.
322         */
323        public void close() throws IOException {
324          for (int i = 0; i < iters.length; ++i) {
325            iters[i].close();
326          }
327        }
328    
329        /**
330         * Write the next value into key, value as accepted by the operation
331         * associated with this set of RecordReaders.
332         */
333        public boolean flush(TupleWritable value) throws IOException {
334          while (hasNext()) {
335            value.clearWritten();
336            if (next(value) && combine(kids, value)) {
337              return true;
338            }
339          }
340          return false;
341        }
342      }
343    
344      /**
345       * Return the key for the current join or the value at the top of the
346       * RecordReader heap.
347       */
348      public K key() {
349        if (jc.hasNext()) {
350          return jc.key();
351        }
352        if (!q.isEmpty()) {
353          return q.peek().key();
354        }
355        return null;
356      }
357    
358      /**
359       * Clone the key at the top of this RR into the given object.
360       */
361      public void key(K key) throws IOException {
362        ReflectionUtils.copy(conf, key(), key);
363      }
364    
365      public K getCurrentKey() {
366        return key;
367      }
368      
369      /**
370       * Return true if it is possible that this could emit more values.
371       */
372      public boolean hasNext() {
373        return jc.hasNext() || !q.isEmpty();
374      }
375    
376      /**
377       * Pass skip key to child RRs.
378       */
379      public void skip(K key) throws IOException, InterruptedException {
380        ArrayList<ComposableRecordReader<K,?>> tmp =
381          new ArrayList<ComposableRecordReader<K,?>>();
382        while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
383          tmp.add(q.poll());
384        }
385        for (ComposableRecordReader<K,?> rr : tmp) {
386          rr.skip(key);
387          if (rr.hasNext()) {
388            q.add(rr);
389          }
390        }
391      }
392    
393      /**
394       * Obtain an iterator over the child RRs apropos of the value type
395       * ultimately emitted from this join.
396       */
397      protected abstract ResetableIterator<X> getDelegate();
398    
399      /**
400       * If key provided matches that of this Composite, give JoinCollector
401       * iterator over values it may emit.
402       */
403      @SuppressWarnings("unchecked") // No values from static EMPTY class
404      @Override
405      public void accept(CompositeRecordReader.JoinCollector jc, K key)
406          throws IOException, InterruptedException {
407        if (hasNext() && 0 == cmp.compare(key, key())) {
408          fillJoinCollector(createKey());
409          jc.add(id, getDelegate());
410          return;
411        }
412        jc.add(id, EMPTY);
413      }
414    
415      /**
416       * For all child RRs offering the key provided, obtain an iterator
417       * at that position in the JoinCollector.
418       */
419      protected void fillJoinCollector(K iterkey) 
420          throws IOException, InterruptedException {
421        if (!q.isEmpty()) {
422          q.peek().key(iterkey);
423          while (0 == cmp.compare(q.peek().key(), iterkey)) {
424            ComposableRecordReader<K,?> t = q.poll();
425            t.accept(jc, iterkey);
426            if (t.hasNext()) {
427              q.add(t);
428            } else if (q.isEmpty()) {
429              return;
430            }
431          }
432        }
433      }
434    
435      /**
436       * Implement Comparable contract (compare key of join or head of heap
437       * with that of another).
438       */
439      public int compareTo(ComposableRecordReader<K,?> other) {
440        return cmp.compare(key(), other.key());
441      }
442    
443      /**
444       * Create a new key common to all child RRs.
445       * @throws ClassCastException if key classes differ.
446       */
447      @SuppressWarnings("unchecked")
448      protected K createKey() {
449        if (keyclass == null || keyclass.equals(NullWritable.class)) {
450          return (K) NullWritable.get();
451        }
452        return (K) ReflectionUtils.newInstance(keyclass, getConf());
453      }
454    
455      /**
456       * Create a value to be used internally for joins.
457       */
458      protected TupleWritable createTupleWritable() {
459        Writable[] vals = new Writable[kids.length];
460        for (int i = 0; i < vals.length; ++i) {
461          vals[i] = kids[i].createValue();
462        }
463        return new TupleWritable(vals);
464      }
465    
466      /** {@inheritDoc} */
467      public X getCurrentValue() 
468          throws IOException, InterruptedException {
469        return value;
470      }
471    
472      /**
473       * Close all child RRs.
474       */
475      public void close() throws IOException {
476        if (kids != null) {
477          for (RecordReader<K,? extends Writable> rr : kids) {
478            rr.close();
479          }
480        }
481        if (jc != null) {
482          jc.close();
483        }
484      }
485    
486      /**
487       * Report progress as the minimum of all child RR progress.
488       */
489      public float getProgress() throws IOException, InterruptedException {
490        float ret = 1.0f;
491        for (RecordReader<K,? extends Writable> rr : kids) {
492          ret = Math.min(ret, rr.getProgress());
493        }
494        return ret;
495      }
496      
497    }