001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.join;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.io.Writable;
026    import org.apache.hadoop.io.WritableComparable;
027    import org.apache.hadoop.io.WritableComparator;
028    import org.apache.hadoop.io.WritableUtils;
029    import org.apache.hadoop.mapred.RecordReader;
030    
031    /**
032     * Proxy class for a RecordReader participating in the join framework.
033     * This class keeps track of the "head" key-value pair for the
034     * provided RecordReader and keeps a store of values matching a key when
035     * this source is participating in a join.
036     */
037    @InterfaceAudience.Public
038    @InterfaceStability.Stable
039    public class WrappedRecordReader<K extends WritableComparable,
040                              U extends Writable>
041        implements ComposableRecordReader<K,U> {
042    
043      private boolean empty = false;
044      private RecordReader<K,U> rr;
045      private int id;  // index at which values will be inserted in collector
046    
047      private K khead; // key at the top of this RR
048      private U vhead; // value assoc with khead
049      private WritableComparator cmp;
050    
051      private ResetableIterator<U> vjoin;
052    
053      /**
054       * For a given RecordReader rr, occupy position id in collector.
055       */
056      WrappedRecordReader(int id, RecordReader<K,U> rr,
057          Class<? extends WritableComparator> cmpcl) throws IOException {
058        this.id = id;
059        this.rr = rr;
060        khead = rr.createKey();
061        vhead = rr.createValue();
062        try {
063          cmp = (null == cmpcl)
064            ? WritableComparator.get(khead.getClass())
065            : cmpcl.newInstance();
066        } catch (InstantiationException e) {
067          throw (IOException)new IOException().initCause(e);
068        } catch (IllegalAccessException e) {
069          throw (IOException)new IOException().initCause(e);
070        }
071        vjoin = new StreamBackedIterator<U>();
072        next();
073      }
074    
075      /** {@inheritDoc} */
076      public int id() {
077        return id;
078      }
079    
080      /**
081       * Return the key at the head of this RR.
082       */
083      public K key() {
084        return khead;
085      }
086    
087      /**
088       * Clone the key at the head of this RR into the object supplied.
089       */
090      public void key(K qkey) throws IOException {
091        WritableUtils.cloneInto(qkey, khead);
092      }
093    
094      /**
095       * Return true if the RR- including the k,v pair stored in this object-
096       * is exhausted.
097       */
098      public boolean hasNext() {
099        return !empty;
100      }
101    
102      /**
103       * Skip key-value pairs with keys less than or equal to the key provided.
104       */
105      public void skip(K key) throws IOException {
106        if (hasNext()) {
107          while (cmp.compare(khead, key) <= 0 && next());
108        }
109      }
110    
111      /**
112       * Read the next k,v pair into the head of this object; return true iff
113       * the RR and this are exhausted.
114       */
115      protected boolean next() throws IOException {
116        empty = !rr.next(khead, vhead);
117        return hasNext();
118      }
119    
120      /**
121       * Add an iterator to the collector at the position occupied by this
122       * RecordReader over the values in this stream paired with the key
123       * provided (ie register a stream of values from this source matching K
124       * with a collector).
125       */
126                                     // JoinCollector comes from parent, which has
127      @SuppressWarnings("unchecked") // no static type for the slot this sits in
128      public void accept(CompositeRecordReader.JoinCollector i, K key)
129          throws IOException {
130        vjoin.clear();
131        if (0 == cmp.compare(key, khead)) {
132          do {
133            vjoin.add(vhead);
134          } while (next() && 0 == cmp.compare(key, khead));
135        }
136        i.add(id, vjoin);
137      }
138    
139      /**
140       * Write key-value pair at the head of this stream to the objects provided;
141       * get next key-value pair from proxied RR.
142       */
143      public boolean next(K key, U value) throws IOException {
144        if (hasNext()) {
145          WritableUtils.cloneInto(key, khead);
146          WritableUtils.cloneInto(value, vhead);
147          next();
148          return true;
149        }
150        return false;
151      }
152    
153      /**
154       * Request new key from proxied RR.
155       */
156      public K createKey() {
157        return rr.createKey();
158      }
159    
160      /**
161       * Request new value from proxied RR.
162       */
163      public U createValue() {
164        return rr.createValue();
165      }
166    
167      /**
168       * Request progress from proxied RR.
169       */
170      public float getProgress() throws IOException {
171        return rr.getProgress();
172      }
173    
174      /**
175       * Request position from proxied RR.
176       */
177      public long getPos() throws IOException {
178        return rr.getPos();
179      }
180    
181      /**
182       * Forward close request to proxied RR.
183       */
184      public void close() throws IOException {
185        rr.close();
186      }
187    
188      /**
189       * Implement Comparable contract (compare key at head of proxied RR
190       * with that of another).
191       */
192      public int compareTo(ComposableRecordReader<K,?> other) {
193        return cmp.compare(key(), other.key());
194      }
195    
196      /**
197       * Return true iff compareTo(other) retn true.
198       */
199      @SuppressWarnings("unchecked") // Explicit type check prior to cast
200      public boolean equals(Object other) {
201        return other instanceof ComposableRecordReader
202            && 0 == compareTo((ComposableRecordReader)other);
203      }
204    
205      public int hashCode() {
206        assert false : "hashCode not designed";
207        return 42;
208      }
209    
210    }