001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.join;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.conf.Configurable;
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.hadoop.io.Writable;
028    import org.apache.hadoop.io.WritableComparable;
029    import org.apache.hadoop.io.WritableComparator;
030    import org.apache.hadoop.io.WritableUtils;
031    import org.apache.hadoop.mapred.RecordReader;
032    
033    /**
034     * Proxy class for a RecordReader participating in the join framework.
035     * This class keeps track of the "head" key-value pair for the
036     * provided RecordReader and keeps a store of values matching a key when
037     * this source is participating in a join.
038     */
039    @InterfaceAudience.Public
040    @InterfaceStability.Stable
041    public class WrappedRecordReader<K extends WritableComparable,
042                              U extends Writable>
043        implements ComposableRecordReader<K,U>, Configurable {
044    
045      private boolean empty = false;
046      private RecordReader<K,U> rr;
047      private int id;  // index at which values will be inserted in collector
048    
049      private K khead; // key at the top of this RR
050      private U vhead; // value assoc with khead
051      private WritableComparator cmp;
052      private Configuration conf;
053    
054      private ResetableIterator<U> vjoin;
055    
056      /**
057       * For a given RecordReader rr, occupy position id in collector.
058       */
059      WrappedRecordReader(int id, RecordReader<K,U> rr,
060          Class<? extends WritableComparator> cmpcl) throws IOException {
061        this(id, rr, cmpcl, null);
062      }
063    
064      WrappedRecordReader(int id, RecordReader<K,U> rr,
065                          Class<? extends WritableComparator> cmpcl,
066                          Configuration conf) throws IOException {
067        this.id = id;
068        this.rr = rr;
069        this.conf = (conf == null) ? new Configuration() : conf;
070        khead = rr.createKey();
071        vhead = rr.createValue();
072        try {
073          cmp = (null == cmpcl)
074            ? WritableComparator.get(khead.getClass(), this.conf)
075            : cmpcl.newInstance();
076        } catch (InstantiationException e) {
077          throw (IOException)new IOException().initCause(e);
078        } catch (IllegalAccessException e) {
079          throw (IOException)new IOException().initCause(e);
080        }
081        vjoin = new StreamBackedIterator<U>();
082        next();
083      }
084    
085      /** {@inheritDoc} */
086      public int id() {
087        return id;
088      }
089    
090      /**
091       * Return the key at the head of this RR.
092       */
093      public K key() {
094        return khead;
095      }
096    
097      /**
098       * Clone the key at the head of this RR into the object supplied.
099       */
100      public void key(K qkey) throws IOException {
101        WritableUtils.cloneInto(qkey, khead);
102      }
103    
104      /**
105       * Return true if the RR- including the k,v pair stored in this object-
106       * is exhausted.
107       */
108      public boolean hasNext() {
109        return !empty;
110      }
111    
112      /**
113       * Skip key-value pairs with keys less than or equal to the key provided.
114       */
115      public void skip(K key) throws IOException {
116        if (hasNext()) {
117          while (cmp.compare(khead, key) <= 0 && next());
118        }
119      }
120    
121      /**
122       * Read the next k,v pair into the head of this object; return true iff
123       * the RR and this are exhausted.
124       */
125      protected boolean next() throws IOException {
126        empty = !rr.next(khead, vhead);
127        return hasNext();
128      }
129    
130      /**
131       * Add an iterator to the collector at the position occupied by this
132       * RecordReader over the values in this stream paired with the key
133       * provided (ie register a stream of values from this source matching K
134       * with a collector).
135       */
136                                     // JoinCollector comes from parent, which has
137      @SuppressWarnings("unchecked") // no static type for the slot this sits in
138      public void accept(CompositeRecordReader.JoinCollector i, K key)
139          throws IOException {
140        vjoin.clear();
141        if (0 == cmp.compare(key, khead)) {
142          do {
143            vjoin.add(vhead);
144          } while (next() && 0 == cmp.compare(key, khead));
145        }
146        i.add(id, vjoin);
147      }
148    
149      /**
150       * Write key-value pair at the head of this stream to the objects provided;
151       * get next key-value pair from proxied RR.
152       */
153      public boolean next(K key, U value) throws IOException {
154        if (hasNext()) {
155          WritableUtils.cloneInto(key, khead);
156          WritableUtils.cloneInto(value, vhead);
157          next();
158          return true;
159        }
160        return false;
161      }
162    
163      /**
164       * Request new key from proxied RR.
165       */
166      public K createKey() {
167        return rr.createKey();
168      }
169    
170      /**
171       * Request new value from proxied RR.
172       */
173      public U createValue() {
174        return rr.createValue();
175      }
176    
177      /**
178       * Request progress from proxied RR.
179       */
180      public float getProgress() throws IOException {
181        return rr.getProgress();
182      }
183    
184      /**
185       * Request position from proxied RR.
186       */
187      public long getPos() throws IOException {
188        return rr.getPos();
189      }
190    
191      /**
192       * Forward close request to proxied RR.
193       */
194      public void close() throws IOException {
195        rr.close();
196      }
197    
198      /**
199       * Implement Comparable contract (compare key at head of proxied RR
200       * with that of another).
201       */
202      public int compareTo(ComposableRecordReader<K,?> other) {
203        return cmp.compare(key(), other.key());
204      }
205    
206      /**
207       * Return true iff compareTo(other) retn true.
208       */
209      @SuppressWarnings("unchecked") // Explicit type check prior to cast
210      public boolean equals(Object other) {
211        return other instanceof ComposableRecordReader
212            && 0 == compareTo((ComposableRecordReader)other);
213      }
214    
215      public int hashCode() {
216        assert false : "hashCode not designed";
217        return 42;
218      }
219    
220      @Override
221      public void setConf(Configuration conf) {
222        this.conf = conf;
223      }
224    
225      @Override
226      public Configuration getConf() {
227        return conf;
228      }
229    }