001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.join;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.io.NullWritable;
027import org.apache.hadoop.io.Writable;
028import org.apache.hadoop.io.WritableComparable;
029import org.apache.hadoop.io.WritableComparator;
030import org.apache.hadoop.mapreduce.InputSplit;
031import org.apache.hadoop.mapreduce.RecordReader;
032import org.apache.hadoop.mapreduce.TaskAttemptContext;
033import org.apache.hadoop.util.ReflectionUtils;
034
035/**
036 * Proxy class for a RecordReader participating in the join framework.
037 * 
038 * This class keeps track of the "head" key-value pair for the
039 * provided RecordReader and keeps a store of values matching a key when
040 * this source is participating in a join.
041 */
042@InterfaceAudience.Public
043@InterfaceStability.Stable
044public class WrappedRecordReader<K extends WritableComparable<?>,
045    U extends Writable> extends ComposableRecordReader<K,U> {
046
047  protected boolean empty = false;
048  private RecordReader<K,U> rr;
049  private int id;  // index at which values will be inserted in collector
050
051  protected WritableComparator cmp = null;
052  private K key; // key at the top of this RR
053  private U value; // value assoc with key
054  private ResetableIterator<U> vjoin;
055  private Configuration conf = new Configuration();
056  @SuppressWarnings("unchecked")
057  private Class<? extends WritableComparable> keyclass = null; 
058  private Class<? extends Writable> valueclass = null; 
059  
060  protected WrappedRecordReader(int id) {
061    this.id = id;
062    vjoin = new StreamBackedIterator<U>();
063  }
064  
065  /**
066   * For a given RecordReader rr, occupy position id in collector.
067   */
068  WrappedRecordReader(int id, RecordReader<K,U> rr,
069      Class<? extends WritableComparator> cmpcl) 
070  throws IOException, InterruptedException {
071    this.id = id;
072    this.rr = rr;
073    if (cmpcl != null) {
074      try {
075        this.cmp = cmpcl.newInstance();
076      } catch (InstantiationException e) {
077        throw new IOException(e);
078      } catch (IllegalAccessException e) {
079        throw new IOException(e);
080      }
081    }
082    vjoin = new StreamBackedIterator<U>();
083  }
084
085  public void initialize(InputSplit split,
086                         TaskAttemptContext context)
087  throws IOException, InterruptedException {
088    rr.initialize(split, context);
089    conf = context.getConfiguration();
090    nextKeyValue();
091    if (!empty) {
092      keyclass = key.getClass().asSubclass(WritableComparable.class);
093      valueclass = value.getClass();
094      if (cmp == null) {
095        cmp = WritableComparator.get(keyclass, conf);
096      }
097    }
098  }
099
100  /**
101   * Request new key from proxied RR.
102   */
103  @SuppressWarnings("unchecked")
104  public K createKey() {
105    if (keyclass != null) {
106      return (K) ReflectionUtils.newInstance(keyclass, conf);
107    }
108    return (K) NullWritable.get();
109  }
110  
111  @SuppressWarnings("unchecked")
112  public U createValue() {
113    if (valueclass != null) {
114      return (U) ReflectionUtils.newInstance(valueclass, conf);
115    }
116    return (U) NullWritable.get();
117  }
118  
119  /** {@inheritDoc} */
120  public int id() {
121    return id;
122  }
123
124  /**
125   * Return the key at the head of this RR.
126   */
127  public K key() {
128    return key;
129  }
130
131  /**
132   * Clone the key at the head of this RR into the object supplied.
133   */
134  public void key(K qkey) throws IOException {
135    ReflectionUtils.copy(conf, key, qkey);
136  }
137
138  /**
139   * Return true if the RR- including the k,v pair stored in this object-
140   * is exhausted.
141   */
142  public boolean hasNext() {
143    return !empty;
144  }
145
146  /**
147   * Skip key-value pairs with keys less than or equal to the key provided.
148   */
149  public void skip(K key) throws IOException, InterruptedException {
150    if (hasNext()) {
151      while (cmp.compare(key(), key) <= 0 && next());
152    }
153  }
154
155  /**
156   * Add an iterator to the collector at the position occupied by this
157   * RecordReader over the values in this stream paired with the key
158   * provided (ie register a stream of values from this source matching K
159   * with a collector).
160   */
161  @SuppressWarnings("unchecked")
162  public void accept(CompositeRecordReader.JoinCollector i, K key)
163      throws IOException, InterruptedException {
164    vjoin.clear();
165    if (key() != null && 0 == cmp.compare(key, key())) {
166      do {
167        vjoin.add(value);
168      } while (next() && 0 == cmp.compare(key, key()));
169    }
170    i.add(id, vjoin);
171  }
172
173  /**
174   * Read the next k,v pair into the head of this object; return true iff
175   * the RR and this are exhausted.
176   */
177  public boolean nextKeyValue() throws IOException, InterruptedException {
178    if (hasNext()) {
179      next();
180      return true;
181    }
182    return false;
183  }
184
185  /**
186   * Read the next k,v pair into the head of this object; return true iff
187   * the RR and this are exhausted.
188   */
189  private boolean next() throws IOException, InterruptedException {
190    empty = !rr.nextKeyValue();
191    key = rr.getCurrentKey();
192    value = rr.getCurrentValue();
193    return !empty;
194  }
195
196  /**
197   * Get current key 
198   */
199  public K getCurrentKey() throws IOException, InterruptedException {
200    return rr.getCurrentKey();
201  }
202
203  /**
204   * Get current value
205   */
206  public U getCurrentValue() throws IOException, InterruptedException {
207    return rr.getCurrentValue();
208  }
209
210  /**
211   * Request progress from proxied RR.
212   */
213  public float getProgress() throws IOException, InterruptedException {
214    return rr.getProgress();
215  }
216
217  /**
218   * Forward close request to proxied RR.
219   */
220  public void close() throws IOException {
221    rr.close();
222  }
223
224  /**
225   * Implement Comparable contract (compare key at head of proxied RR
226   * with that of another).
227   */
228  public int compareTo(ComposableRecordReader<K,?> other) {
229    return cmp.compare(key(), other.key());
230  }
231
232  /**
233   * Return true iff compareTo(other) retn true.
234   */
235  @SuppressWarnings("unchecked") // Explicit type check prior to cast
236  public boolean equals(Object other) {
237    return other instanceof ComposableRecordReader
238        && 0 == compareTo((ComposableRecordReader)other);
239  }
240
241  public int hashCode() {
242    assert false : "hashCode not designed";
243    return 42;
244  }
245}