001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred.join; 020 021import java.io.IOException; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025import org.apache.hadoop.conf.Configurable; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.io.Writable; 028import org.apache.hadoop.io.WritableComparable; 029import org.apache.hadoop.io.WritableComparator; 030import org.apache.hadoop.io.WritableUtils; 031import org.apache.hadoop.mapred.RecordReader; 032 033/** 034 * Proxy class for a RecordReader participating in the join framework. 035 * This class keeps track of the "head" key-value pair for the 036 * provided RecordReader and keeps a store of values matching a key when 037 * this source is participating in a join. 038 */ 039@InterfaceAudience.Public 040@InterfaceStability.Stable 041public class WrappedRecordReader<K extends WritableComparable, 042 U extends Writable> 043 implements ComposableRecordReader<K,U>, Configurable { 044 045 private boolean empty = false; 046 private RecordReader<K,U> rr; 047 private int id; // index at which values will be inserted in collector 048 049 private K khead; // key at the top of this RR 050 private U vhead; // value assoc with khead 051 private WritableComparator cmp; 052 private Configuration conf; 053 054 private ResetableIterator<U> vjoin; 055 056 /** 057 * For a given RecordReader rr, occupy position id in collector. 058 */ 059 WrappedRecordReader(int id, RecordReader<K,U> rr, 060 Class<? extends WritableComparator> cmpcl) throws IOException { 061 this(id, rr, cmpcl, null); 062 } 063 064 WrappedRecordReader(int id, RecordReader<K,U> rr, 065 Class<? extends WritableComparator> cmpcl, 066 Configuration conf) throws IOException { 067 this.id = id; 068 this.rr = rr; 069 this.conf = (conf == null) ? new Configuration() : conf; 070 khead = rr.createKey(); 071 vhead = rr.createValue(); 072 try { 073 cmp = (null == cmpcl) 074 ? WritableComparator.get(khead.getClass(), this.conf) 075 : cmpcl.newInstance(); 076 } catch (InstantiationException e) { 077 throw (IOException)new IOException().initCause(e); 078 } catch (IllegalAccessException e) { 079 throw (IOException)new IOException().initCause(e); 080 } 081 vjoin = new StreamBackedIterator<U>(); 082 next(); 083 } 084 085 /** {@inheritDoc} */ 086 public int id() { 087 return id; 088 } 089 090 /** 091 * Return the key at the head of this RR. 092 */ 093 public K key() { 094 return khead; 095 } 096 097 /** 098 * Clone the key at the head of this RR into the object supplied. 099 */ 100 public void key(K qkey) throws IOException { 101 WritableUtils.cloneInto(qkey, khead); 102 } 103 104 /** 105 * Return true if the RR- including the k,v pair stored in this object- 106 * is exhausted. 107 */ 108 public boolean hasNext() { 109 return !empty; 110 } 111 112 /** 113 * Skip key-value pairs with keys less than or equal to the key provided. 114 */ 115 public void skip(K key) throws IOException { 116 if (hasNext()) { 117 while (cmp.compare(khead, key) <= 0 && next()); 118 } 119 } 120 121 /** 122 * Read the next k,v pair into the head of this object; return true iff 123 * the RR and this are exhausted. 124 */ 125 protected boolean next() throws IOException { 126 empty = !rr.next(khead, vhead); 127 return hasNext(); 128 } 129 130 /** 131 * Add an iterator to the collector at the position occupied by this 132 * RecordReader over the values in this stream paired with the key 133 * provided (ie register a stream of values from this source matching K 134 * with a collector). 135 */ 136 // JoinCollector comes from parent, which has 137 @SuppressWarnings("unchecked") // no static type for the slot this sits in 138 public void accept(CompositeRecordReader.JoinCollector i, K key) 139 throws IOException { 140 vjoin.clear(); 141 if (0 == cmp.compare(key, khead)) { 142 do { 143 vjoin.add(vhead); 144 } while (next() && 0 == cmp.compare(key, khead)); 145 } 146 i.add(id, vjoin); 147 } 148 149 /** 150 * Write key-value pair at the head of this stream to the objects provided; 151 * get next key-value pair from proxied RR. 152 */ 153 public boolean next(K key, U value) throws IOException { 154 if (hasNext()) { 155 WritableUtils.cloneInto(key, khead); 156 WritableUtils.cloneInto(value, vhead); 157 next(); 158 return true; 159 } 160 return false; 161 } 162 163 /** 164 * Request new key from proxied RR. 165 */ 166 public K createKey() { 167 return rr.createKey(); 168 } 169 170 /** 171 * Request new value from proxied RR. 172 */ 173 public U createValue() { 174 return rr.createValue(); 175 } 176 177 /** 178 * Request progress from proxied RR. 179 */ 180 public float getProgress() throws IOException { 181 return rr.getProgress(); 182 } 183 184 /** 185 * Request position from proxied RR. 186 */ 187 public long getPos() throws IOException { 188 return rr.getPos(); 189 } 190 191 /** 192 * Forward close request to proxied RR. 193 */ 194 public void close() throws IOException { 195 rr.close(); 196 } 197 198 /** 199 * Implement Comparable contract (compare key at head of proxied RR 200 * with that of another). 201 */ 202 public int compareTo(ComposableRecordReader<K,?> other) { 203 return cmp.compare(key(), other.key()); 204 } 205 206 /** 207 * Return true iff compareTo(other) retn true. 208 */ 209 @SuppressWarnings("unchecked") // Explicit type check prior to cast 210 public boolean equals(Object other) { 211 return other instanceof ComposableRecordReader 212 && 0 == compareTo((ComposableRecordReader)other); 213 } 214 215 public int hashCode() { 216 assert false : "hashCode not designed"; 217 return 42; 218 } 219 220 @Override 221 public void setConf(Configuration conf) { 222 this.conf = conf; 223 } 224 225 @Override 226 public Configuration getConf() { 227 return conf; 228 } 229}