001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.join; 020 021 import java.io.DataOutput; 022 import java.io.DataInput; 023 import java.io.IOException; 024 import java.util.BitSet; 025 import java.util.Iterator; 026 import java.util.NoSuchElementException; 027 028 import org.apache.hadoop.classification.InterfaceAudience; 029 import org.apache.hadoop.classification.InterfaceStability; 030 import org.apache.hadoop.io.NullWritable; 031 import org.apache.hadoop.io.Text; 032 import org.apache.hadoop.io.Writable; 033 import org.apache.hadoop.io.WritableUtils; 034 035 /** 036 * Writable type storing multiple {@link org.apache.hadoop.io.Writable}s. 037 * 038 * This is *not* a general-purpose tuple type. In almost all cases, users are 039 * encouraged to implement their own serializable types, which can perform 040 * better validation and provide more efficient encodings than this class is 041 * capable. TupleWritable relies on the join framework for type safety and 042 * assumes its instances will rarely be persisted, assumptions not only 043 * incompatible with, but contrary to the general case. 044 * 045 * @see org.apache.hadoop.io.Writable 046 */ 047 @InterfaceAudience.Public 048 @InterfaceStability.Stable 049 public class TupleWritable implements Writable, Iterable<Writable> { 050 051 protected BitSet written; 052 private Writable[] values; 053 054 /** 055 * Create an empty tuple with no allocated storage for writables. 056 */ 057 public TupleWritable() { 058 written = new BitSet(0); 059 } 060 061 /** 062 * Initialize tuple with storage; unknown whether any of them contain 063 * "written" values. 064 */ 065 public TupleWritable(Writable[] vals) { 066 written = new BitSet(vals.length); 067 values = vals; 068 } 069 070 /** 071 * Return true if tuple has an element at the position provided. 072 */ 073 public boolean has(int i) { 074 return written.get(i); 075 } 076 077 /** 078 * Get ith Writable from Tuple. 079 */ 080 public Writable get(int i) { 081 return values[i]; 082 } 083 084 /** 085 * The number of children in this Tuple. 086 */ 087 public int size() { 088 return values.length; 089 } 090 091 /** 092 * {@inheritDoc} 093 */ 094 public boolean equals(Object other) { 095 if (other instanceof TupleWritable) { 096 TupleWritable that = (TupleWritable)other; 097 if (!this.written.equals(that.written)) { 098 return false; 099 } 100 for (int i = 0; i < values.length; ++i) { 101 if (!has(i)) continue; 102 if (!values[i].equals(that.get(i))) { 103 return false; 104 } 105 } 106 return true; 107 } 108 return false; 109 } 110 111 public int hashCode() { 112 assert false : "hashCode not designed"; 113 return written.hashCode(); 114 } 115 116 /** 117 * Return an iterator over the elements in this tuple. 118 * Note that this doesn't flatten the tuple; one may receive tuples 119 * from this iterator. 120 */ 121 public Iterator<Writable> iterator() { 122 final TupleWritable t = this; 123 return new Iterator<Writable>() { 124 int bitIndex = written.nextSetBit(0); 125 public boolean hasNext() { 126 return bitIndex >= 0; 127 } 128 public Writable next() { 129 int returnIndex = bitIndex; 130 if (returnIndex < 0) 131 throw new NoSuchElementException(); 132 bitIndex = written.nextSetBit(bitIndex+1); 133 return t.get(returnIndex); 134 } 135 public void remove() { 136 if (!written.get(bitIndex)) { 137 throw new IllegalStateException( 138 "Attempt to remove non-existent val"); 139 } 140 written.clear(bitIndex); 141 } 142 }; 143 } 144 145 /** 146 * Convert Tuple to String as in the following. 147 * <tt>[<child1>,<child2>,...,<childn>]</tt> 148 */ 149 public String toString() { 150 StringBuffer buf = new StringBuffer("["); 151 for (int i = 0; i < values.length; ++i) { 152 buf.append(has(i) ? values[i].toString() : ""); 153 buf.append(","); 154 } 155 if (values.length != 0) 156 buf.setCharAt(buf.length() - 1, ']'); 157 else 158 buf.append(']'); 159 return buf.toString(); 160 } 161 162 // Writable 163 164 /** Writes each Writable to <code>out</code>. 165 * TupleWritable format: 166 * {@code 167 * <count><type1><type2>...<typen><obj1><obj2>...<objn> 168 * } 169 */ 170 public void write(DataOutput out) throws IOException { 171 WritableUtils.writeVInt(out, values.length); 172 writeBitSet(out, values.length, written); 173 for (int i = 0; i < values.length; ++i) { 174 Text.writeString(out, values[i].getClass().getName()); 175 } 176 for (int i = 0; i < values.length; ++i) { 177 if (has(i)) { 178 values[i].write(out); 179 } 180 } 181 } 182 183 /** 184 * {@inheritDoc} 185 */ 186 @SuppressWarnings("unchecked") // No static typeinfo on Tuples 187 public void readFields(DataInput in) throws IOException { 188 int card = WritableUtils.readVInt(in); 189 values = new Writable[card]; 190 readBitSet(in, card, written); 191 Class<? extends Writable>[] cls = new Class[card]; 192 try { 193 for (int i = 0; i < card; ++i) { 194 cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class); 195 } 196 for (int i = 0; i < card; ++i) { 197 if (cls[i].equals(NullWritable.class)) { 198 values[i] = NullWritable.get(); 199 } else { 200 values[i] = cls[i].newInstance(); 201 } 202 if (has(i)) { 203 values[i].readFields(in); 204 } 205 } 206 } catch (ClassNotFoundException e) { 207 throw new IOException("Failed tuple init", e); 208 } catch (IllegalAccessException e) { 209 throw new IOException("Failed tuple init", e); 210 } catch (InstantiationException e) { 211 throw new IOException("Failed tuple init", e); 212 } 213 } 214 215 /** 216 * Record that the tuple contains an element at the position provided. 217 */ 218 void setWritten(int i) { 219 written.set(i); 220 } 221 222 /** 223 * Record that the tuple does not contain an element at the position 224 * provided. 225 */ 226 void clearWritten(int i) { 227 written.clear(i); 228 } 229 230 /** 231 * Clear any record of which writables have been written to, without 232 * releasing storage. 233 */ 234 void clearWritten() { 235 written.clear(); 236 } 237 238 /** 239 * Writes the bit set to the stream. The first 64 bit-positions of the bit 240 * set are written as a VLong for backwards-compatibility with older 241 * versions of TupleWritable. All bit-positions >= 64 are encoded as a byte 242 * for every 8 bit-positions. 243 */ 244 private static final void writeBitSet(DataOutput stream, int nbits, 245 BitSet bitSet) throws IOException { 246 long bits = 0L; 247 248 int bitSetIndex = bitSet.nextSetBit(0); 249 for (;bitSetIndex >= 0 && bitSetIndex < Long.SIZE; 250 bitSetIndex=bitSet.nextSetBit(bitSetIndex+1)) { 251 bits |= 1L << bitSetIndex; 252 } 253 WritableUtils.writeVLong(stream,bits); 254 255 if (nbits > Long.SIZE) { 256 bits = 0L; 257 for (int lastWordWritten = 0; bitSetIndex >= 0 && bitSetIndex < nbits; 258 bitSetIndex = bitSet.nextSetBit(bitSetIndex+1)) { 259 int bitsIndex = bitSetIndex % Byte.SIZE; 260 int word = (bitSetIndex-Long.SIZE) / Byte.SIZE; 261 if (word > lastWordWritten) { 262 stream.writeByte((byte)bits); 263 bits = 0L; 264 for (lastWordWritten++;lastWordWritten<word;lastWordWritten++) { 265 stream.writeByte((byte)bits); 266 } 267 } 268 bits |= 1L << bitsIndex; 269 } 270 stream.writeByte((byte)bits); 271 } 272 } 273 274 /** 275 * Reads a bitset from the stream that has been written with 276 * {@link #writeBitSet(DataOutput, int, BitSet)}. 277 */ 278 private static final void readBitSet(DataInput stream, int nbits, 279 BitSet bitSet) throws IOException { 280 bitSet.clear(); 281 long initialBits = WritableUtils.readVLong(stream); 282 long last = 0L; 283 while (0L != initialBits) { 284 last = Long.lowestOneBit(initialBits); 285 initialBits ^= last; 286 bitSet.set(Long.numberOfTrailingZeros(last)); 287 } 288 289 for (int offset=Long.SIZE; offset < nbits; offset+=Byte.SIZE) { 290 byte bits = stream.readByte(); 291 while (0 != bits) { 292 last = Long.lowestOneBit(bits); 293 bits ^= last; 294 bitSet.set(Long.numberOfTrailingZeros(last) + offset); 295 } 296 } 297 } 298 }