001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.join;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.io.NullWritable;
027    import org.apache.hadoop.io.Writable;
028    import org.apache.hadoop.io.WritableComparable;
029    import org.apache.hadoop.io.WritableComparator;
030    import org.apache.hadoop.mapreduce.InputSplit;
031    import org.apache.hadoop.mapreduce.RecordReader;
032    import org.apache.hadoop.mapreduce.TaskAttemptContext;
033    import org.apache.hadoop.util.ReflectionUtils;
034    
035    /**
036     * Proxy class for a RecordReader participating in the join framework.
037     * 
038     * This class keeps track of the "head" key-value pair for the
039     * provided RecordReader and keeps a store of values matching a key when
040     * this source is participating in a join.
041     */
042    @InterfaceAudience.Public
043    @InterfaceStability.Stable
044    public class WrappedRecordReader<K extends WritableComparable<?>,
045        U extends Writable> extends ComposableRecordReader<K,U> {
046    
047      protected boolean empty = false;
048      private RecordReader<K,U> rr;
049      private int id;  // index at which values will be inserted in collector
050    
051      protected WritableComparator cmp = null;
052      private K key; // key at the top of this RR
053      private U value; // value assoc with key
054      private ResetableIterator<U> vjoin;
055      private Configuration conf = new Configuration();
056      @SuppressWarnings("unchecked")
057      private Class<? extends WritableComparable> keyclass = null; 
058      private Class<? extends Writable> valueclass = null; 
059      
060      protected WrappedRecordReader(int id) {
061        this.id = id;
062        vjoin = new StreamBackedIterator<U>();
063      }
064      
065      /**
066       * For a given RecordReader rr, occupy position id in collector.
067       */
068      WrappedRecordReader(int id, RecordReader<K,U> rr,
069          Class<? extends WritableComparator> cmpcl) 
070      throws IOException, InterruptedException {
071        this.id = id;
072        this.rr = rr;
073        if (cmpcl != null) {
074          try {
075            this.cmp = cmpcl.newInstance();
076          } catch (InstantiationException e) {
077            throw new IOException(e);
078          } catch (IllegalAccessException e) {
079            throw new IOException(e);
080          }
081        }
082        vjoin = new StreamBackedIterator<U>();
083      }
084    
085      public void initialize(InputSplit split,
086                             TaskAttemptContext context)
087      throws IOException, InterruptedException {
088        rr.initialize(split, context);
089        conf = context.getConfiguration();
090        nextKeyValue();
091        if (!empty) {
092          keyclass = key.getClass().asSubclass(WritableComparable.class);
093          valueclass = value.getClass();
094          if (cmp == null) {
095            cmp = WritableComparator.get(keyclass);
096          }
097        }
098      }
099    
100      /**
101       * Request new key from proxied RR.
102       */
103      @SuppressWarnings("unchecked")
104      public K createKey() {
105        if (keyclass != null) {
106          return (K) ReflectionUtils.newInstance(keyclass, conf);
107        }
108        return (K) NullWritable.get();
109      }
110      
111      @SuppressWarnings("unchecked")
112      public U createValue() {
113        if (valueclass != null) {
114          return (U) ReflectionUtils.newInstance(valueclass, conf);
115        }
116        return (U) NullWritable.get();
117      }
118      
119      /** {@inheritDoc} */
120      public int id() {
121        return id;
122      }
123    
124      /**
125       * Return the key at the head of this RR.
126       */
127      public K key() {
128        return key;
129      }
130    
131      /**
132       * Clone the key at the head of this RR into the object supplied.
133       */
134      public void key(K qkey) throws IOException {
135        ReflectionUtils.copy(conf, key, qkey);
136      }
137    
138      /**
139       * Return true if the RR- including the k,v pair stored in this object-
140       * is exhausted.
141       */
142      public boolean hasNext() {
143        return !empty;
144      }
145    
146      /**
147       * Skip key-value pairs with keys less than or equal to the key provided.
148       */
149      public void skip(K key) throws IOException, InterruptedException {
150        if (hasNext()) {
151          while (cmp.compare(key(), key) <= 0 && next());
152        }
153      }
154    
155      /**
156       * Add an iterator to the collector at the position occupied by this
157       * RecordReader over the values in this stream paired with the key
158       * provided (ie register a stream of values from this source matching K
159       * with a collector).
160       */
161      @SuppressWarnings("unchecked")
162      public void accept(CompositeRecordReader.JoinCollector i, K key)
163          throws IOException, InterruptedException {
164        vjoin.clear();
165        if (key() != null && 0 == cmp.compare(key, key())) {
166          do {
167            vjoin.add(value);
168          } while (next() && 0 == cmp.compare(key, key()));
169        }
170        i.add(id, vjoin);
171      }
172    
173      /**
174       * Read the next k,v pair into the head of this object; return true iff
175       * the RR and this are exhausted.
176       */
177      public boolean nextKeyValue() throws IOException, InterruptedException {
178        if (hasNext()) {
179          next();
180          return true;
181        }
182        return false;
183      }
184    
185      /**
186       * Read the next k,v pair into the head of this object; return true iff
187       * the RR and this are exhausted.
188       */
189      private boolean next() throws IOException, InterruptedException {
190        empty = !rr.nextKeyValue();
191        key = rr.getCurrentKey();
192        value = rr.getCurrentValue();
193        return !empty;
194      }
195    
196      /**
197       * Get current key 
198       */
199      public K getCurrentKey() throws IOException, InterruptedException {
200        return rr.getCurrentKey();
201      }
202    
203      /**
204       * Get current value
205       */
206      public U getCurrentValue() throws IOException, InterruptedException {
207        return rr.getCurrentValue();
208      }
209    
210      /**
211       * Request progress from proxied RR.
212       */
213      public float getProgress() throws IOException, InterruptedException {
214        return rr.getProgress();
215      }
216    
217      /**
218       * Forward close request to proxied RR.
219       */
220      public void close() throws IOException {
221        rr.close();
222      }
223    
224      /**
225       * Implement Comparable contract (compare key at head of proxied RR
226       * with that of another).
227       */
228      public int compareTo(ComposableRecordReader<K,?> other) {
229        return cmp.compare(key(), other.key());
230      }
231    
232      /**
233       * Return true iff compareTo(other) retn true.
234       */
235      @SuppressWarnings("unchecked") // Explicit type check prior to cast
236      public boolean equals(Object other) {
237        return other instanceof ComposableRecordReader
238            && 0 == compareTo((ComposableRecordReader)other);
239      }
240    
241      public int hashCode() {
242        assert false : "hashCode not designed";
243        return 42;
244      }
245    }