001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.join;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.io.Writable;
026import org.apache.hadoop.io.WritableComparable;
027import org.apache.hadoop.io.WritableComparator;
028import org.apache.hadoop.io.WritableUtils;
029import org.apache.hadoop.mapred.RecordReader;
030
031/**
032 * Proxy class for a RecordReader participating in the join framework.
033 * This class keeps track of the "head" key-value pair for the
034 * provided RecordReader and keeps a store of values matching a key when
035 * this source is participating in a join.
036 */
037@InterfaceAudience.Public
038@InterfaceStability.Stable
039public class WrappedRecordReader<K extends WritableComparable,
040                          U extends Writable>
041    implements ComposableRecordReader<K,U> {
042
043  private boolean empty = false;
044  private RecordReader<K,U> rr;
045  private int id;  // index at which values will be inserted in collector
046
047  private K khead; // key at the top of this RR
048  private U vhead; // value assoc with khead
049  private WritableComparator cmp;
050
051  private ResetableIterator<U> vjoin;
052
053  /**
054   * For a given RecordReader rr, occupy position id in collector.
055   */
056  WrappedRecordReader(int id, RecordReader<K,U> rr,
057      Class<? extends WritableComparator> cmpcl) throws IOException {
058    this.id = id;
059    this.rr = rr;
060    khead = rr.createKey();
061    vhead = rr.createValue();
062    try {
063      cmp = (null == cmpcl)
064        ? WritableComparator.get(khead.getClass())
065        : cmpcl.newInstance();
066    } catch (InstantiationException e) {
067      throw (IOException)new IOException().initCause(e);
068    } catch (IllegalAccessException e) {
069      throw (IOException)new IOException().initCause(e);
070    }
071    vjoin = new StreamBackedIterator<U>();
072    next();
073  }
074
075  /** {@inheritDoc} */
076  public int id() {
077    return id;
078  }
079
080  /**
081   * Return the key at the head of this RR.
082   */
083  public K key() {
084    return khead;
085  }
086
087  /**
088   * Clone the key at the head of this RR into the object supplied.
089   */
090  public void key(K qkey) throws IOException {
091    WritableUtils.cloneInto(qkey, khead);
092  }
093
094  /**
095   * Return true if the RR- including the k,v pair stored in this object-
096   * is exhausted.
097   */
098  public boolean hasNext() {
099    return !empty;
100  }
101
102  /**
103   * Skip key-value pairs with keys less than or equal to the key provided.
104   */
105  public void skip(K key) throws IOException {
106    if (hasNext()) {
107      while (cmp.compare(khead, key) <= 0 && next());
108    }
109  }
110
111  /**
112   * Read the next k,v pair into the head of this object; return true iff
113   * the RR and this are exhausted.
114   */
115  protected boolean next() throws IOException {
116    empty = !rr.next(khead, vhead);
117    return hasNext();
118  }
119
120  /**
121   * Add an iterator to the collector at the position occupied by this
122   * RecordReader over the values in this stream paired with the key
123   * provided (ie register a stream of values from this source matching K
124   * with a collector).
125   */
126                                 // JoinCollector comes from parent, which has
127  @SuppressWarnings("unchecked") // no static type for the slot this sits in
128  public void accept(CompositeRecordReader.JoinCollector i, K key)
129      throws IOException {
130    vjoin.clear();
131    if (0 == cmp.compare(key, khead)) {
132      do {
133        vjoin.add(vhead);
134      } while (next() && 0 == cmp.compare(key, khead));
135    }
136    i.add(id, vjoin);
137  }
138
139  /**
140   * Write key-value pair at the head of this stream to the objects provided;
141   * get next key-value pair from proxied RR.
142   */
143  public boolean next(K key, U value) throws IOException {
144    if (hasNext()) {
145      WritableUtils.cloneInto(key, khead);
146      WritableUtils.cloneInto(value, vhead);
147      next();
148      return true;
149    }
150    return false;
151  }
152
153  /**
154   * Request new key from proxied RR.
155   */
156  public K createKey() {
157    return rr.createKey();
158  }
159
160  /**
161   * Request new value from proxied RR.
162   */
163  public U createValue() {
164    return rr.createValue();
165  }
166
167  /**
168   * Request progress from proxied RR.
169   */
170  public float getProgress() throws IOException {
171    return rr.getProgress();
172  }
173
174  /**
175   * Request position from proxied RR.
176   */
177  public long getPos() throws IOException {
178    return rr.getPos();
179  }
180
181  /**
182   * Forward close request to proxied RR.
183   */
184  public void close() throws IOException {
185    rr.close();
186  }
187
188  /**
189   * Implement Comparable contract (compare key at head of proxied RR
190   * with that of another).
191   */
192  public int compareTo(ComposableRecordReader<K,?> other) {
193    return cmp.compare(key(), other.key());
194  }
195
196  /**
197   * Return true iff compareTo(other) retn true.
198   */
199  @SuppressWarnings("unchecked") // Explicit type check prior to cast
200  public boolean equals(Object other) {
201    return other instanceof ComposableRecordReader
202        && 0 == compareTo((ComposableRecordReader)other);
203  }
204
205  public int hashCode() {
206    assert false : "hashCode not designed";
207    return 42;
208  }
209
210}