001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred.join; 020 021import java.io.IOException; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025import org.apache.hadoop.io.Writable; 026import org.apache.hadoop.io.WritableComparable; 027import org.apache.hadoop.io.WritableComparator; 028import org.apache.hadoop.io.WritableUtils; 029import org.apache.hadoop.mapred.RecordReader; 030 031/** 032 * Proxy class for a RecordReader participating in the join framework. 033 * This class keeps track of the "head" key-value pair for the 034 * provided RecordReader and keeps a store of values matching a key when 035 * this source is participating in a join. 036 */ 037@InterfaceAudience.Public 038@InterfaceStability.Stable 039public class WrappedRecordReader<K extends WritableComparable, 040 U extends Writable> 041 implements ComposableRecordReader<K,U> { 042 043 private boolean empty = false; 044 private RecordReader<K,U> rr; 045 private int id; // index at which values will be inserted in collector 046 047 private K khead; // key at the top of this RR 048 private U vhead; // value assoc with khead 049 private WritableComparator cmp; 050 051 private ResetableIterator<U> vjoin; 052 053 /** 054 * For a given RecordReader rr, occupy position id in collector. 055 */ 056 WrappedRecordReader(int id, RecordReader<K,U> rr, 057 Class<? extends WritableComparator> cmpcl) throws IOException { 058 this.id = id; 059 this.rr = rr; 060 khead = rr.createKey(); 061 vhead = rr.createValue(); 062 try { 063 cmp = (null == cmpcl) 064 ? WritableComparator.get(khead.getClass()) 065 : cmpcl.newInstance(); 066 } catch (InstantiationException e) { 067 throw (IOException)new IOException().initCause(e); 068 } catch (IllegalAccessException e) { 069 throw (IOException)new IOException().initCause(e); 070 } 071 vjoin = new StreamBackedIterator<U>(); 072 next(); 073 } 074 075 /** {@inheritDoc} */ 076 public int id() { 077 return id; 078 } 079 080 /** 081 * Return the key at the head of this RR. 082 */ 083 public K key() { 084 return khead; 085 } 086 087 /** 088 * Clone the key at the head of this RR into the object supplied. 089 */ 090 public void key(K qkey) throws IOException { 091 WritableUtils.cloneInto(qkey, khead); 092 } 093 094 /** 095 * Return true if the RR- including the k,v pair stored in this object- 096 * is exhausted. 097 */ 098 public boolean hasNext() { 099 return !empty; 100 } 101 102 /** 103 * Skip key-value pairs with keys less than or equal to the key provided. 104 */ 105 public void skip(K key) throws IOException { 106 if (hasNext()) { 107 while (cmp.compare(khead, key) <= 0 && next()); 108 } 109 } 110 111 /** 112 * Read the next k,v pair into the head of this object; return true iff 113 * the RR and this are exhausted. 114 */ 115 protected boolean next() throws IOException { 116 empty = !rr.next(khead, vhead); 117 return hasNext(); 118 } 119 120 /** 121 * Add an iterator to the collector at the position occupied by this 122 * RecordReader over the values in this stream paired with the key 123 * provided (ie register a stream of values from this source matching K 124 * with a collector). 125 */ 126 // JoinCollector comes from parent, which has 127 @SuppressWarnings("unchecked") // no static type for the slot this sits in 128 public void accept(CompositeRecordReader.JoinCollector i, K key) 129 throws IOException { 130 vjoin.clear(); 131 if (0 == cmp.compare(key, khead)) { 132 do { 133 vjoin.add(vhead); 134 } while (next() && 0 == cmp.compare(key, khead)); 135 } 136 i.add(id, vjoin); 137 } 138 139 /** 140 * Write key-value pair at the head of this stream to the objects provided; 141 * get next key-value pair from proxied RR. 142 */ 143 public boolean next(K key, U value) throws IOException { 144 if (hasNext()) { 145 WritableUtils.cloneInto(key, khead); 146 WritableUtils.cloneInto(value, vhead); 147 next(); 148 return true; 149 } 150 return false; 151 } 152 153 /** 154 * Request new key from proxied RR. 155 */ 156 public K createKey() { 157 return rr.createKey(); 158 } 159 160 /** 161 * Request new value from proxied RR. 162 */ 163 public U createValue() { 164 return rr.createValue(); 165 } 166 167 /** 168 * Request progress from proxied RR. 169 */ 170 public float getProgress() throws IOException { 171 return rr.getProgress(); 172 } 173 174 /** 175 * Request position from proxied RR. 176 */ 177 public long getPos() throws IOException { 178 return rr.getPos(); 179 } 180 181 /** 182 * Forward close request to proxied RR. 183 */ 184 public void close() throws IOException { 185 rr.close(); 186 } 187 188 /** 189 * Implement Comparable contract (compare key at head of proxied RR 190 * with that of another). 191 */ 192 public int compareTo(ComposableRecordReader<K,?> other) { 193 return cmp.compare(key(), other.key()); 194 } 195 196 /** 197 * Return true iff compareTo(other) retn true. 198 */ 199 @SuppressWarnings("unchecked") // Explicit type check prior to cast 200 public boolean equals(Object other) { 201 return other instanceof ComposableRecordReader 202 && 0 == compareTo((ComposableRecordReader)other); 203 } 204 205 public int hashCode() { 206 assert false : "hashCode not designed"; 207 return 42; 208 } 209 210}