001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.join;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.conf.Configurable;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.io.Writable;
028import org.apache.hadoop.io.WritableComparable;
029import org.apache.hadoop.io.WritableComparator;
030import org.apache.hadoop.io.WritableUtils;
031import org.apache.hadoop.mapred.RecordReader;
032
033/**
034 * Proxy class for a RecordReader participating in the join framework.
035 * This class keeps track of the "head" key-value pair for the
036 * provided RecordReader and keeps a store of values matching a key when
037 * this source is participating in a join.
038 */
039@InterfaceAudience.Public
040@InterfaceStability.Stable
041public class WrappedRecordReader<K extends WritableComparable,
042                          U extends Writable>
043    implements ComposableRecordReader<K,U>, Configurable {
044
045  private boolean empty = false;
046  private RecordReader<K,U> rr;
047  private int id;  // index at which values will be inserted in collector
048
049  private K khead; // key at the top of this RR
050  private U vhead; // value assoc with khead
051  private WritableComparator cmp;
052  private Configuration conf;
053
054  private ResetableIterator<U> vjoin;
055
056  /**
057   * For a given RecordReader rr, occupy position id in collector.
058   */
059  WrappedRecordReader(int id, RecordReader<K,U> rr,
060      Class<? extends WritableComparator> cmpcl) throws IOException {
061    this(id, rr, cmpcl, null);
062  }
063
064  WrappedRecordReader(int id, RecordReader<K,U> rr,
065                      Class<? extends WritableComparator> cmpcl,
066                      Configuration conf) throws IOException {
067    this.id = id;
068    this.rr = rr;
069    this.conf = (conf == null) ? new Configuration() : conf;
070    khead = rr.createKey();
071    vhead = rr.createValue();
072    try {
073      cmp = (null == cmpcl)
074        ? WritableComparator.get(khead.getClass(), this.conf)
075        : cmpcl.newInstance();
076    } catch (InstantiationException e) {
077      throw (IOException)new IOException().initCause(e);
078    } catch (IllegalAccessException e) {
079      throw (IOException)new IOException().initCause(e);
080    }
081    vjoin = new StreamBackedIterator<U>();
082    next();
083  }
084
085  /** {@inheritDoc} */
086  public int id() {
087    return id;
088  }
089
090  /**
091   * Return the key at the head of this RR.
092   */
093  public K key() {
094    return khead;
095  }
096
097  /**
098   * Clone the key at the head of this RR into the object supplied.
099   */
100  public void key(K qkey) throws IOException {
101    WritableUtils.cloneInto(qkey, khead);
102  }
103
104  /**
105   * Return true if the RR- including the k,v pair stored in this object-
106   * is exhausted.
107   */
108  public boolean hasNext() {
109    return !empty;
110  }
111
112  /**
113   * Skip key-value pairs with keys less than or equal to the key provided.
114   */
115  public void skip(K key) throws IOException {
116    if (hasNext()) {
117      while (cmp.compare(khead, key) <= 0 && next());
118    }
119  }
120
121  /**
122   * Read the next k,v pair into the head of this object; return true iff
123   * the RR and this are exhausted.
124   */
125  protected boolean next() throws IOException {
126    empty = !rr.next(khead, vhead);
127    return hasNext();
128  }
129
130  /**
131   * Add an iterator to the collector at the position occupied by this
132   * RecordReader over the values in this stream paired with the key
133   * provided (ie register a stream of values from this source matching K
134   * with a collector).
135   */
136                                 // JoinCollector comes from parent, which has
137  @SuppressWarnings("unchecked") // no static type for the slot this sits in
138  public void accept(CompositeRecordReader.JoinCollector i, K key)
139      throws IOException {
140    vjoin.clear();
141    if (0 == cmp.compare(key, khead)) {
142      do {
143        vjoin.add(vhead);
144      } while (next() && 0 == cmp.compare(key, khead));
145    }
146    i.add(id, vjoin);
147  }
148
149  /**
150   * Write key-value pair at the head of this stream to the objects provided;
151   * get next key-value pair from proxied RR.
152   */
153  public boolean next(K key, U value) throws IOException {
154    if (hasNext()) {
155      WritableUtils.cloneInto(key, khead);
156      WritableUtils.cloneInto(value, vhead);
157      next();
158      return true;
159    }
160    return false;
161  }
162
163  /**
164   * Request new key from proxied RR.
165   */
166  public K createKey() {
167    return rr.createKey();
168  }
169
170  /**
171   * Request new value from proxied RR.
172   */
173  public U createValue() {
174    return rr.createValue();
175  }
176
177  /**
178   * Request progress from proxied RR.
179   */
180  public float getProgress() throws IOException {
181    return rr.getProgress();
182  }
183
184  /**
185   * Request position from proxied RR.
186   */
187  public long getPos() throws IOException {
188    return rr.getPos();
189  }
190
191  /**
192   * Forward close request to proxied RR.
193   */
194  public void close() throws IOException {
195    rr.close();
196  }
197
198  /**
199   * Implement Comparable contract (compare key at head of proxied RR
200   * with that of another).
201   */
202  public int compareTo(ComposableRecordReader<K,?> other) {
203    return cmp.compare(key(), other.key());
204  }
205
206  /**
207   * Return true iff compareTo(other) retn true.
208   */
209  @SuppressWarnings("unchecked") // Explicit type check prior to cast
210  public boolean equals(Object other) {
211    return other instanceof ComposableRecordReader
212        && 0 == compareTo((ComposableRecordReader)other);
213  }
214
215  public int hashCode() {
216    assert false : "hashCode not designed";
217    return 42;
218  }
219
220  @Override
221  public void setConf(Configuration conf) {
222    this.conf = conf;
223  }
224
225  @Override
226  public Configuration getConf() {
227    return conf;
228  }
229}