001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.join;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.conf.Configurable;
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.hadoop.io.Writable;
028 import org.apache.hadoop.io.WritableComparable;
029 import org.apache.hadoop.io.WritableComparator;
030 import org.apache.hadoop.io.WritableUtils;
031 import org.apache.hadoop.mapred.RecordReader;
032
033 /**
034 * Proxy class for a RecordReader participating in the join framework.
035 * This class keeps track of the "head" key-value pair for the
036 * provided RecordReader and keeps a store of values matching a key when
037 * this source is participating in a join.
038 */
039 @InterfaceAudience.Public
040 @InterfaceStability.Stable
041 public class WrappedRecordReader<K extends WritableComparable,
042 U extends Writable>
043 implements ComposableRecordReader<K,U>, Configurable {
044
045 private boolean empty = false;
046 private RecordReader<K,U> rr;
047 private int id; // index at which values will be inserted in collector
048
049 private K khead; // key at the top of this RR
050 private U vhead; // value assoc with khead
051 private WritableComparator cmp;
052 private Configuration conf;
053
054 private ResetableIterator<U> vjoin;
055
056 /**
057 * For a given RecordReader rr, occupy position id in collector.
058 */
059 WrappedRecordReader(int id, RecordReader<K,U> rr,
060 Class<? extends WritableComparator> cmpcl) throws IOException {
061 this(id, rr, cmpcl, null);
062 }
063
064 WrappedRecordReader(int id, RecordReader<K,U> rr,
065 Class<? extends WritableComparator> cmpcl,
066 Configuration conf) throws IOException {
067 this.id = id;
068 this.rr = rr;
069 this.conf = (conf == null) ? new Configuration() : conf;
070 khead = rr.createKey();
071 vhead = rr.createValue();
072 try {
073 cmp = (null == cmpcl)
074 ? WritableComparator.get(khead.getClass(), this.conf)
075 : cmpcl.newInstance();
076 } catch (InstantiationException e) {
077 throw (IOException)new IOException().initCause(e);
078 } catch (IllegalAccessException e) {
079 throw (IOException)new IOException().initCause(e);
080 }
081 vjoin = new StreamBackedIterator<U>();
082 next();
083 }
084
085 /** {@inheritDoc} */
086 public int id() {
087 return id;
088 }
089
090 /**
091 * Return the key at the head of this RR.
092 */
093 public K key() {
094 return khead;
095 }
096
097 /**
098 * Clone the key at the head of this RR into the object supplied.
099 */
100 public void key(K qkey) throws IOException {
101 WritableUtils.cloneInto(qkey, khead);
102 }
103
104 /**
105 * Return true if the RR- including the k,v pair stored in this object-
106 * is exhausted.
107 */
108 public boolean hasNext() {
109 return !empty;
110 }
111
112 /**
113 * Skip key-value pairs with keys less than or equal to the key provided.
114 */
115 public void skip(K key) throws IOException {
116 if (hasNext()) {
117 while (cmp.compare(khead, key) <= 0 && next());
118 }
119 }
120
121 /**
122 * Read the next k,v pair into the head of this object; return true iff
123 * the RR and this are exhausted.
124 */
125 protected boolean next() throws IOException {
126 empty = !rr.next(khead, vhead);
127 return hasNext();
128 }
129
130 /**
131 * Add an iterator to the collector at the position occupied by this
132 * RecordReader over the values in this stream paired with the key
133 * provided (ie register a stream of values from this source matching K
134 * with a collector).
135 */
136 // JoinCollector comes from parent, which has
137 @SuppressWarnings("unchecked") // no static type for the slot this sits in
138 public void accept(CompositeRecordReader.JoinCollector i, K key)
139 throws IOException {
140 vjoin.clear();
141 if (0 == cmp.compare(key, khead)) {
142 do {
143 vjoin.add(vhead);
144 } while (next() && 0 == cmp.compare(key, khead));
145 }
146 i.add(id, vjoin);
147 }
148
149 /**
150 * Write key-value pair at the head of this stream to the objects provided;
151 * get next key-value pair from proxied RR.
152 */
153 public boolean next(K key, U value) throws IOException {
154 if (hasNext()) {
155 WritableUtils.cloneInto(key, khead);
156 WritableUtils.cloneInto(value, vhead);
157 next();
158 return true;
159 }
160 return false;
161 }
162
163 /**
164 * Request new key from proxied RR.
165 */
166 public K createKey() {
167 return rr.createKey();
168 }
169
170 /**
171 * Request new value from proxied RR.
172 */
173 public U createValue() {
174 return rr.createValue();
175 }
176
177 /**
178 * Request progress from proxied RR.
179 */
180 public float getProgress() throws IOException {
181 return rr.getProgress();
182 }
183
184 /**
185 * Request position from proxied RR.
186 */
187 public long getPos() throws IOException {
188 return rr.getPos();
189 }
190
191 /**
192 * Forward close request to proxied RR.
193 */
194 public void close() throws IOException {
195 rr.close();
196 }
197
198 /**
199 * Implement Comparable contract (compare key at head of proxied RR
200 * with that of another).
201 */
202 public int compareTo(ComposableRecordReader<K,?> other) {
203 return cmp.compare(key(), other.key());
204 }
205
206 /**
207 * Return true iff compareTo(other) retn true.
208 */
209 @SuppressWarnings("unchecked") // Explicit type check prior to cast
210 public boolean equals(Object other) {
211 return other instanceof ComposableRecordReader
212 && 0 == compareTo((ComposableRecordReader)other);
213 }
214
215 public int hashCode() {
216 assert false : "hashCode not designed";
217 return 42;
218 }
219
220 @Override
221 public void setConf(Configuration conf) {
222 this.conf = conf;
223 }
224
225 @Override
226 public Configuration getConf() {
227 return conf;
228 }
229 }